Author: jbellis
Date: Thu Dec 2 17:42:15 2010
New Revision: 1041489
URL: http://svn.apache.org/viewvc?rev=1041489&view=rev
Log:
revert last
Added:
cassandra/branches/cassandra-0.7/contrib/maven/
- copied from r1041485, cassandra/branches/cassandra-0.7/contrib/maven/
cassandra/branches/cassandra-0.7/contrib/maven/pom.xml
- copied unchanged from r1041485,
cassandra/branches/cassandra-0.7/contrib/maven/pom.xml
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java?rev=1041489&r1=1041488&r2=1041489&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
Thu Dec 2 17:42:15 2010
@@ -47,7 +47,6 @@ public class RowMutation
{
private static ICompactSerializer<RowMutation> serializer_;
public static final String HINT = "HINT";
- public static final String FORWARD_HEADER = "FORWARD";
static
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1041489&r1=1041488&r2=1041489&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Thu Dec 2 17:42:15 2010
@@ -18,23 +18,25 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import java.io.*;
+
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import com.google.common.base.Charsets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import static com.google.common.base.Charsets.UTF_8;
+
public class RowMutationVerbHandler implements IVerbHandler
{
@@ -67,11 +69,6 @@ public class RowMutationVerbHandler impl
hintedMutation.apply();
}
}
-
- // Check if there were any forwarding headers in this message
- byte[] forwardBytes =
message.getHeader(RowMutation.FORWARD_HEADER);
- if (forwardBytes != null)
- forwardToLocalNodes(message, forwardBytes);
Table.open(rm.getTable()).apply(rm, bytes, true);
@@ -85,34 +82,5 @@ public class RowMutationVerbHandler impl
{
logger_.error("Error in row mutation", e);
}
- }
-
- private void forwardToLocalNodes(Message message, byte[] forwardBytes)
throws UnknownHostException
- {
- // remove fwds from message to avoid infinite loop
- message.setHeader(RowMutation.FORWARD_HEADER, null);
-
- int bytesPerInetAddress =
FBUtilities.getLocalAddress().getAddress().length;
- assert forwardBytes.length >= bytesPerInetAddress;
- assert forwardBytes.length % bytesPerInetAddress == 0;
-
- int offset = 0;
- byte[] addressBytes = new byte[bytesPerInetAddress];
-
- // Send a message to each of the addresses on our Forward List
- while (offset < forwardBytes.length)
- {
- System.arraycopy(forwardBytes, offset, addressBytes, 0,
bytesPerInetAddress);
- InetAddress address = InetAddress.getByAddress(addressBytes);
-
- if (logger_.isDebugEnabled())
- logger_.debug("Forwarding message to " + address);
-
- // Send the original message to the address specified by the
FORWARD_HINT
- // Let the response go back to the coordinator
- MessagingService.instance.sendOneWay(message, message.getFrom());
-
- offset += bytesPerInetAddress;
- }
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041489&r1=1041488&r2=1041489&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Dec 2 17:42:15 2010
@@ -27,23 +27,20 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import static com.google.common.base.Charsets.UTF_8;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
@@ -56,8 +53,7 @@ import org.apache.cassandra.utils.FBUtil
import org.apache.cassandra.utils.LatencyTracker;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-
-import static com.google.common.base.Charsets.UTF_8;
+import org.apache.cassandra.db.filter.QueryFilter;
public class StorageProxy implements StorageProxyMBean
{
@@ -94,14 +90,13 @@ public class StorageProxy implements Sto
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
- public static void mutate(List<RowMutation> mutations, ConsistencyLevel
consistencyLevel) throws UnavailableException, TimeoutException
+ public static void mutate(List<RowMutation> mutations, ConsistencyLevel
consistency_level) throws UnavailableException, TimeoutException
{
long startTime = System.nanoTime();
- List<IWriteResponseHandler> responseHandlers = new
ArrayList<IWriteResponseHandler>();
+ ArrayList<IWriteResponseHandler> responseHandlers = new
ArrayList<IWriteResponseHandler>();
RowMutation mostRecentRowMutation = null;
StorageService ss = StorageService.instance;
- String localDataCenter = getDataCenter(FBUtilities.getLocalAddress());
try
{
@@ -115,67 +110,58 @@ public class StorageProxy implements Sto
Collection<InetAddress> writeEndpoints =
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()),
table, naturalEndpoints);
Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
- final IWriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel);
-
- // exit early if we can't fulfuill the CL at this time
+ // send out the writes, as in mutate() above, but this time
with a callback that tracks responses
+ final IWriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
responseHandler.assureSufficientLiveNodes();
-
- responseHandlers.add(responseHandler);
-
- // Creates a Multimap that holds onto all the messages and
addresses meant for a specific datacenter.
- Multimap<String, Pair<Message, InetAddress>> dcMap =
groupEndpointsByDataCenter(rm, hintedEndpoints, responseHandler);
-
- // Traverse all dataCenters where messages will
be sent to.
- for (Map.Entry<String, Collection<Pair<Message, InetAddress>>>
entry : dcMap.asMap().entrySet())
- {
- String dataCenter = entry.getKey();
-
- // Grab a set of all the messages bound for this
dataCenter and create an iterator over this set.
- Collection<Pair<Message, InetAddress>>
messagesForDataCenter = entry.getValue();
- Iterator<Pair<Message, InetAddress>> iter =
messagesForDataCenter.iterator();
- assert iter.hasNext();
- // First endpoint in list is the destination for this group
- Pair<Message, InetAddress> messageAndDestination =
iter.next();
-
- Message primaryMessage = messageAndDestination.left;
- InetAddress target = messageAndDestination.right;
+ responseHandlers.add(responseHandler);
+ Message unhintedMessage = null;
+ for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
+ {
+ InetAddress destination = entry.getKey();
+ Collection<InetAddress> targets = entry.getValue();
- // Add all the other destinations that are bound for the
same dataCenter as a header in the primary message.
- while (iter.hasNext())
+ if (targets.size() == 1 &&
targets.iterator().next().equals(destination))
{
- messageAndDestination = iter.next();
- assert messageAndDestination.left == primaryMessage;
-
- if (dataCenter.equals(localDataCenter))
+ // unhinted writes
+ if (destination.equals(FBUtilities.getLocalAddress()))
{
- // direct write to local DC
- assert
primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
-
MessagingService.instance.sendOneWay(primaryMessage, target);
+ insertLocalMessage(rm, responseHandler);
}
else
{
- // group all nodes in this DC as forward headers
on the primary message
- ByteArrayOutputStream bos = new
ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // append to older addresses
- byte[] previousHints =
primaryMessage.getHeader(RowMutation.FORWARD_HEADER);
- if (previousHints != null)
- dos.write(previousHints);
-
-
dos.write(messageAndDestination.right.getAddress());
-
primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+ // belongs on a different server. send it there.
+ if (unhintedMessage == null)
+ {
+ unhintedMessage = rm.makeRowMutationMessage();
+
MessagingService.instance.addCallback(responseHandler,
unhintedMessage.getMessageId());
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
+
MessagingService.instance.sendOneWay(unhintedMessage, destination);
}
- }
-
- MessagingService.instance.sendOneWay(primaryMessage,
target);
+ }
+ else
+ {
+ // hinted
+ Message hintedMessage = rm.makeRowMutationMessage();
+ for (InetAddress target : targets)
+ {
+ if (!target.equals(destination))
+ {
+ addHintHeader(hintedMessage, target);
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@"
+ destination + " for " + target);
+ }
+ }
+ responseHandler.addHintCallback(hintedMessage,
destination);
+ MessagingService.instance.sendOneWay(hintedMessage,
destination);
+ }
}
}
-
// wait for writes. throws timeoutexception if necessary
for (IWriteResponseHandler responseHandler : responseHandlers)
- {
+ {
responseHandler.get();
}
}
@@ -192,66 +178,6 @@ public class StorageProxy implements Sto
}
}
-
- private static Multimap<String, Pair<Message, InetAddress>>
groupEndpointsByDataCenter(RowMutation rm, Multimap<InetAddress, InetAddress>
endpoints, final IWriteResponseHandler responseHandler) throws IOException
- {
-
- Set<Map.Entry<InetAddress, Collection<InetAddress>>> endpointSet =
endpoints.asMap().entrySet();
- Multimap<String, Pair<Message, InetAddress>> dcMap =
HashMultimap.create(endpointSet.size(), 10);
- Message unhintedMessage = null;
-
- for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
endpointSet)
- {
- InetAddress destination = entry.getKey();
- Collection<InetAddress> targets = entry.getValue();
-
- String dataCenter = getDataCenter(destination);
-
- if (targets.size() == 1 &&
targets.iterator().next().equals(destination))
- {
- // unhinted writes
- if (destination.equals(FBUtilities.getLocalAddress()))
- {
- insertLocalMessage(rm, responseHandler);
- }
- else
- {
- // belongs on a different server.
- if (unhintedMessage == null)
- {
- unhintedMessage = rm.makeRowMutationMessage();
- MessagingService.instance.addCallback(responseHandler,
unhintedMessage.getMessageId());
- }
-
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
-
- dcMap.put(dataCenter, new
Pair<Message,InetAddress>(unhintedMessage, destination));
- }
- }
- else
- {
- // hinted
- Message hintedMessage = rm.makeRowMutationMessage();
-
- for (InetAddress target : targets)
- {
- if (!target.equals(destination))
- {
- addHintHeader(hintedMessage, target);
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@"
+ destination + " for " + target);
- }
- }
-
- responseHandler.addHintCallback(hintedMessage, destination);
- dcMap.put(dataCenter, new
Pair<Message,InetAddress>(hintedMessage, destination));
- }
- }
-
- return dcMap;
- }
-
private static void addHintHeader(Message message, InetAddress target)
throws IOException
{
@@ -266,22 +192,6 @@ public class StorageProxy implements Sto
message.setHeader(RowMutation.HINT, bos.toByteArray());
}
- private static String getDataCenter(InetAddress addr)
- {
- String dataCenter = null;
- try
- {
- dataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(addr);
- }
- catch (UnsupportedOperationException e)
- {
- // SimpleSnitch throws this
- dataCenter = "default";
- }
-
- return dataCenter;
- }
-
private static void insertLocalMessage(final RowMutation rm, final
IWriteResponseHandler responseHandler)
{
if (logger.isDebugEnabled())