Author: jbellis
Date: Thu Dec 8 20:31:23 2011
New Revision: 1212088
URL: http://svn.apache.org/viewvc?rev=1212088&view=rev
Log:
multi-dc replication optimization supporting CL > ONE
patch by Vijay and jbellis for CASSANDRA-3577
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Dec 8 20:31:23 2011
@@ -1,4 +1,5 @@
1.1-dev
+ * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577)
* add command to stop compactions (CASSANDRA-1740, 3566)
* multithreaded streaming (CASSANDRA-3494)
* removed in-tree redhat spec (CASSANDRA-3567)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Thu Dec 8 20:31:23 2011
@@ -18,18 +18,18 @@
package org.apache.cassandra.db;
+import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-
public class RowMutationVerbHandler implements IVerbHandler
{
@@ -45,7 +45,7 @@ public class RowMutationVerbHandler impl
// Check if there were any forwarding headers in this message
byte[] forwardBytes =
message.getHeader(RowMutation.FORWARD_HEADER);
- if (forwardBytes != null)
+ if (forwardBytes != null && message.getVersion() >=
MessagingService.VERSION_11)
forwardToLocalNodes(message, forwardBytes);
rm.apply();
@@ -62,32 +62,26 @@ public class RowMutationVerbHandler impl
}
}
- private void forwardToLocalNodes(Message message, byte[] forwardBytes)
throws UnknownHostException
+ /**
+ * Older version (< 1.0) will not send this message at all, hence we don't
+ * need to check the version of the data.
+ */
+ private void forwardToLocalNodes(Message message, byte[] forwardBytes)
throws IOException
{
+ DataInputStream dis = new DataInputStream(new
FastByteArrayInputStream(forwardBytes));
+ int size = dis.readInt();
+
// remove fwds from message to avoid infinite loop
Message messageCopy =
message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
-
- int bytesPerInetAddress =
FBUtilities.getBroadcastAddress().getAddress().length;
- assert forwardBytes.length >= bytesPerInetAddress;
- assert forwardBytes.length % bytesPerInetAddress == 0;
-
- int offset = 0;
- byte[] addressBytes = new byte[bytesPerInetAddress];
-
- // Send a message to each of the addresses on our Forward List
- while (offset < forwardBytes.length)
+ for (int i = 0; i < size; i++)
{
- System.arraycopy(forwardBytes, offset, addressBytes, 0,
bytesPerInetAddress);
- InetAddress address = InetAddress.getByAddress(addressBytes);
-
+ // Send a message to each of the addresses on our Forward List
+ InetAddress address =
CompactEndpointSerializationHelper.deserialize(dis);
+ String id = dis.readUTF();
if (logger_.isDebugEnabled())
- logger_.debug("Forwarding message to " + address);
-
- // Send the original message to the address specified by the
FORWARD_HINT
+ logger_.debug("Forwarding message to " + address + " with= ID:
" + id);
// Let the response go back to the coordinator
- MessagingService.instance().sendOneWay(messageCopy, address);
-
- offset += bytesPerInetAddress;
+ MessagingService.instance().sendOneWay(messageCopy, id, address);
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu
Dec 8 20:31:23 2011
@@ -341,8 +341,14 @@ public final class MessagingService impl
return verbHandlers_.get(type);
}
- private void addCallback(IMessageCallback cb, String messageId, Message
message, InetAddress to, long timeout)
+ public String addCallback(IMessageCallback cb, Message message,
InetAddress to)
{
+ return addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT);
+ }
+
+ public String addCallback(IMessageCallback cb, Message message,
InetAddress to, long timeout)
+ {
+ String messageId = nextId();
CallbackInfo previous;
// If HH is enabled and this is a mutation message => store the
message to track for potential hints.
@@ -352,6 +358,7 @@ public final class MessagingService impl
previous = callbacks.put(messageId, new CallbackInfo(to, cb),
timeout);
assert previous == null;
+ return messageId;
}
private static AtomicInteger idGen = new AtomicInteger(0);
@@ -384,8 +391,7 @@ public final class MessagingService impl
*/
public String sendRR(Message message, InetAddress to, IMessageCallback cb,
long timeout)
{
- String id = nextId();
- addCallback(cb, id, message, to, timeout);
+ String id = addCallback(cb, message, to, timeout);
sendOneWay(message, id, to);
return id;
}
@@ -426,7 +432,7 @@ public final class MessagingService impl
* @param message messages to be sent.
* @param to endpoint to which the message needs to be sent
*/
- private void sendOneWay(Message message, String id, InetAddress to)
+ public void sendOneWay(Message message, String id, InetAddress to)
{
if (logger_.isTraceEnabled())
logger_.trace(FBUtilities.getBroadcastAddress() + " sending " +
message.getVerb() + " to " + id + "@" + to);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu
Dec 8 20:31:23 2011
@@ -400,33 +400,41 @@ public class StorageProxy implements Sto
// a single message object is used for unhinted writes, so
clean out any forwards
// from previous loop iterations
message =
message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+ Iterator<InetAddress> iter = messages.getValue().iterator();
+ InetAddress target = iter.next();
- if (dataCenter.equals(localDataCenter))
+ // direct writes to local DC or old Cassadra versions
+ if (dataCenter.equals(localDataCenter) ||
Gossiper.instance.getVersion(target) < MessagingService.VERSION_11)
{
- // direct writes to local DC or old Cassadra versions
- for (InetAddress destination : messages.getValue())
- MessagingService.instance().sendRR(message,
destination, handler);
- }
- else
- {
- // Non-local DC. First endpoint in list is the destination
for this group
- Iterator<InetAddress> iter =
messages.getValue().iterator();
- InetAddress target = iter.next();
- // Add all the other destinations of the same message as a
header in the primary message.
- if (iter.hasNext())
- {
- FastByteArrayOutputStream bos = new
FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- while (iter.hasNext())
- {
- InetAddress destination = iter.next();
- dos.write(destination.getAddress());
- }
- message =
message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
- }
- // send the combined message + forward headers
+ // yes, the loop and non-loop code here are the same; this
is clunky but we want to avoid
+ // creating a second iterator since we already have a
perfectly good one
MessagingService.instance().sendRR(message, target,
handler);
+ while (iter.hasNext())
+ {
+ target = iter.next();
+ MessagingService.instance().sendRR(message, target,
handler);
+ }
+ continue;
+ }
+
+ // Add all the other destinations of the same message as a
FORWARD_HEADER entry
+ FastByteArrayOutputStream bos = new
FastByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeInt(messages.getValue().size() - 1);
+ while (iter.hasNext())
+ {
+ InetAddress destination = iter.next();
+ CompactEndpointSerializationHelper.serialize(destination,
dos);
+ String id =
MessagingService.instance().addCallback(handler, message, destination);
+ dos.writeUTF(id);
+ if (logger.isDebugEnabled())
+ logger.debug("Adding FWD message to: " + destination +
" with ID " + id);
}
+ message = message.withHeaderAdded(RowMutation.FORWARD_HEADER,
bos.toByteArray());
+ // send the combined message + forward headers
+ String id = MessagingService.instance().sendRR(message,
target, handler);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending message to: " + target + " with ID "
+ id);
}
}
}