Author: jbellis
Date: Tue Mar 2 21:25:39 2010
New Revision: 918189
URL: http://svn.apache.org/viewvc?rev=918189&view=rev
Log:
assign write hints to endpoints already getting a write message, if any. patch
by jbellis; reviewed by Ryan King for CASSANDRA-822
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=918189&r1=918188&r2=918189&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Tue Mar 2 21:25:39 2010
@@ -52,6 +52,11 @@
snitch_ = snitch;
}
+ /**
+ * get the endpoints that should store the given Token, for the given
table.
+ * Note that while the endpoints are conceptually a Set (no duplicates
will be included),
+ * we return a List to avoid an extra allocation when sorting by proximity
later.
+ */
public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token,
TokenMetadata metadata, String table);
public WriteResponseHandler getWriteResponseHandler(int blockFor,
ConsistencyLevel consistency_level, String table)
@@ -65,51 +70,43 @@
}
/**
- * returns map of {ultimate target: destination}, where if destination is
not the same
- * as the ultimate target, it is a "hinted" node, a node that will deliver
the data to
+ * returns multimap of {live destination: ultimate targets}, where if
target is not the same
+ * as the destination, it is a "hinted" write, and will need to be sent to
* the ultimate target when it becomes alive again.
- *
- * A destination node may be the destination for multiple targets.
*/
- public Map<InetAddress, InetAddress> getHintedEndpoints(Token token,
String table, Collection<InetAddress> naturalEndpoints)
+ public Multimap<InetAddress, InetAddress> getHintedEndpoints(String table,
Collection<InetAddress> targets)
{
- Collection<InetAddress> targets = getWriteEndpoints(token, table,
naturalEndpoints);
- Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
- Map<InetAddress, InetAddress> map = new HashMap<InetAddress,
InetAddress>();
+ Multimap<InetAddress, InetAddress> map =
HashMultimap.create(targets.size(), 1);
IEndPointSnitch endPointSnitch =
DatabaseDescriptor.getEndPointSnitch(table);
- Set<InetAddress> liveNodes = Gossiper.instance.getLiveMembers();
+ // first, add the live endpoints
for (InetAddress ep : targets)
{
if (FailureDetector.instance.isAlive(ep))
- {
map.put(ep, ep);
- usedEndpoints.add(ep);
- }
- else
- {
- // find another endpoint to store a hint on. prefer endpoints
that aren't already in use
- InetAddress hintLocation = null;
- List<InetAddress> preferred =
endPointSnitch.getSortedListByProximity(ep, liveNodes);
-
- for (InetAddress hintCandidate : preferred)
- {
- if (!targets.contains(hintCandidate)
- && !usedEndpoints.contains(hintCandidate)
- && tokenMetadata_.isMember(hintCandidate))
- {
- hintLocation = hintCandidate;
- break;
- }
- }
- // if all endpoints are already in use, might as well store it
locally to save the network trip
- if (hintLocation == null)
- hintLocation = FBUtilities.getLocalAddress();
+ }
- map.put(ep, hintLocation);
- usedEndpoints.add(hintLocation);
- }
+ if (map.size() == targets.size())
+ return map; // everything was alive
+
+ // assign dead endpoints to be hinted to the closest live one, or to
the local node
+ // (since it is trivially the closest) if none are alive. This way,
the cost of doing
+ // a hint is only adding the hint header, rather than doing a full
extra write, if any
+ // destination nodes are alive.
+ //
+ // we do a 2nd pass on targets instead of using temporary storage,
+ // to optimize for the common case (everything was alive).
+ InetAddress localAddress = FBUtilities.getLocalAddress();
+ for (InetAddress ep : targets)
+ {
+ if (map.containsKey(ep))
+ continue;
+
+ InetAddress destination = map.isEmpty()
+ ? localAddress
+ :
endPointSnitch.getSortedListByProximity(localAddress, map.keySet()).get(0);
+ map.put(destination, ep);
}
return map;
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=918189&r1=918188&r2=918189&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Mar 2 21:25:39 2010
@@ -20,38 +20,40 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import java.net.InetAddress;
-
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LatencyTracker;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.concurrent.StageManager;
-
-import org.apache.log4j.Logger;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
public class StorageProxy implements StorageProxyMBean
@@ -101,25 +103,30 @@
long startTime = System.nanoTime();
try
{
+ StorageService ss = StorageService.instance;
for (final RowMutation rm: mutations)
{
try
{
- List<InetAddress> naturalEndpoints =
StorageService.instance.getNaturalEndpoints(rm.getTable(), rm.key());
- Map<InetAddress, InetAddress> endpointMap =
StorageService.instance.getHintedEndpointMap(rm.getTable(), rm.key(),
naturalEndpoints);
+ String table = rm.getTable();
+ AbstractReplicationStrategy rs =
ss.getReplicationStrategy(table);
+
+ List<InetAddress> naturalEndpoints =
ss.getNaturalEndpoints(table, rm.key());
+ Multimap<InetAddress,InetAddress> hintedEndpoints =
rs.getHintedEndpoints(table, naturalEndpoints);
Message unhintedMessage = null; // lazy initialize for
non-local, unhinted writes
// 3 cases:
// 1. local, unhinted write: run directly on write stage
// 2. non-local, unhinted write: send row mutation message
// 3. hinted write: add hint header, and send message
- for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
+ for (Map.Entry<InetAddress, Collection<InetAddress>> entry
: hintedEndpoints.asMap().entrySet())
{
- InetAddress target = entry.getKey();
- InetAddress hintedTarget = entry.getValue();
- if (target.equals(hintedTarget))
+ InetAddress destination = entry.getKey();
+ Collection<InetAddress> targets = entry.getValue();
+ if (targets.size() == 1 &&
targets.iterator().next().equals(destination))
{
- if (target.equals(FBUtilities.getLocalAddress()))
+ // unhinted writes
+ if
(destination.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
logger.debug("insert writing local key " +
rm.key());
@@ -137,17 +144,24 @@
if (unhintedMessage == null)
unhintedMessage =
rm.makeRowMutationMessage();
if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target);
-
MessagingService.instance.sendOneWay(unhintedMessage, target);
+ logger.debug("insert writing key " +
rm.key() + " to " + unhintedMessage.getMessageId() + "@" + destination);
+
MessagingService.instance.sendOneWay(unhintedMessage, destination);
}
}
else
{
+ // hinted
Message hintedMessage =
rm.makeRowMutationMessage();
- addHintHeader(hintedMessage, target);
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " + rm.key()
+ " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " +
target);
-
MessagingService.instance.sendOneWay(hintedMessage, hintedTarget);
+ for (InetAddress target : targets)
+ {
+ if (!target.equals(destination))
+ {
+ addHintHeader(hintedMessage, target);
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " +
rm.key() + " to " + hintedMessage.getMessageId() + "@" + destination + " for "
+ target);
+ }
+ }
+
MessagingService.instance.sendOneWay(hintedMessage, destination);
}
}
}
@@ -176,31 +190,36 @@
ArrayList<WriteResponseHandler> responseHandlers = new
ArrayList<WriteResponseHandler>();
RowMutation mostRecentRowMutation = null;
+ StorageService ss = StorageService.instance;
try
{
- for (RowMutation rm: mutations)
+ for (RowMutation rm : mutations)
{
mostRecentRowMutation = rm;
- List<InetAddress> naturalEndpoints =
StorageService.instance.getNaturalEndpoints(rm.getTable(), rm.key());
- Map<InetAddress, InetAddress> endpointMap =
StorageService.instance.getHintedEndpointMap(rm.getTable(), rm.key(),
naturalEndpoints);
- int blockFor = determineBlockFor(endpointMap.size(),
consistency_level);
-
+ String table = rm.getTable();
+ AbstractReplicationStrategy rs =
ss.getReplicationStrategy(table);
+
+ List<InetAddress> naturalEndpoints =
ss.getNaturalEndpoints(table, rm.key());
+ Collection<InetAddress> writeEndpoints =
rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table,
naturalEndpoints);
+ Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(table, writeEndpoints);
+ int blockFor = determineBlockFor(writeEndpoints.size(),
consistency_level);
+
// avoid starting a write we know can't achieve the required
consistency
- assureSufficientLiveNodes(endpointMap, blockFor,
consistency_level);
+ assureSufficientLiveNodes(blockFor, writeEndpoints,
hintedEndpoints, consistency_level);
- // send out the writes, as in insert() above, but this time
with a callback that tracks responses
- final WriteResponseHandler responseHandler =
StorageService.instance.getWriteResponseHandler(blockFor, consistency_level,
rm.getTable());
+ // send out the writes, as in mutate() above, but this time
with a callback that tracks responses
+ final WriteResponseHandler responseHandler =
ss.getWriteResponseHandler(blockFor, consistency_level, table);
responseHandlers.add(responseHandler);
Message unhintedMessage = null;
- for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
+ for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
{
- InetAddress naturalTarget = entry.getKey();
- InetAddress maybeHintedTarget = entry.getValue();
-
- if (naturalTarget.equals(maybeHintedTarget))
+ InetAddress destination = entry.getKey();
+ Collection<InetAddress> targets = entry.getValue();
+
+ if (targets.size() == 1 &&
targets.iterator().next().equals(destination))
{
- // not hinted
- if
(naturalTarget.equals(FBUtilities.getLocalAddress()))
+ // unhinted writes
+ if (destination.equals(FBUtilities.getLocalAddress()))
{
insertLocalMessage(rm, responseHandler);
}
@@ -213,20 +232,27 @@
MessagingService.instance.addCallback(responseHandler,
unhintedMessage.getMessageId());
}
if (logger.isDebugEnabled())
- logger.debug("insert writing key " + rm.key()
+ " to " + unhintedMessage.getMessageId() + "@" + naturalTarget);
-
MessagingService.instance.sendOneWay(unhintedMessage, naturalTarget);
+ logger.debug("insert writing key " + rm.key()
+ " to " + unhintedMessage.getMessageId() + "@" + destination);
+
MessagingService.instance.sendOneWay(unhintedMessage, destination);
}
}
else
{
+ // hinted
Message hintedMessage = rm.makeRowMutationMessage();
- addHintHeader(hintedMessage, naturalTarget);
- // (hints are part of the callback and count towards
consistency only under CL.ANY
- if (consistency_level == ConsistencyLevel.ANY)
+ for (InetAddress target : targets)
+ {
+ if (!target.equals(destination))
+ {
+ addHintHeader(hintedMessage, target);
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " +
rm.key() + " to " + hintedMessage.getMessageId() + "@" + destination + " for "
+ target);
+ }
+ }
+ // (non-destination hints are part of the callback and
count towards consistency only under CL.ANY)
+ if (writeEndpoints.contains(destination) ||
consistency_level == ConsistencyLevel.ANY)
MessagingService.instance.addCallback(responseHandler,
hintedMessage.getMessageId());
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " + rm.key() + "
to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " +
naturalTarget);
- MessagingService.instance.sendOneWay(hintedMessage,
maybeHintedTarget);
+ MessagingService.instance.sendOneWay(hintedMessage,
destination);
}
}
}
@@ -250,30 +276,26 @@
}
- private static void assureSufficientLiveNodes(Map<InetAddress,
InetAddress> endpointMap, int blockFor, ConsistencyLevel consistencyLevel)
+ private static void assureSufficientLiveNodes(int blockFor,
Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress>
hintedEndpoints, ConsistencyLevel consistencyLevel)
throws UnavailableException
{
if (consistencyLevel == ConsistencyLevel.ANY)
{
// ensure there are blockFor distinct living nodes (hints are ok).
- if (new HashSet(endpointMap.values()).size() < blockFor)
+ if (hintedEndpoints.keySet().size() < blockFor)
throw new UnavailableException();
}
- else
+
+ // count destinations that are part of the desired target set
+ int liveNodes = 0;
+ for (InetAddress destination : hintedEndpoints.keySet())
{
- // only count live + unhinted nodes.
- int liveNodes = 0;
- for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
- {
- if (entry.getKey().equals(entry.getValue()))
- {
- liveNodes++;
- }
- }
- if (liveNodes < blockFor)
- {
- throw new UnavailableException();
- }
+ if (writeEndpoints.contains(destination))
+ liveNodes++;
+ }
+ if (liveNodes < blockFor)
+ {
+ throw new UnavailableException();
}
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=918189&r1=918188&r2=918189&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Tue Mar 2 21:25:39 2010
@@ -1177,18 +1177,6 @@
}
/**
- * This method returns the N endpoints that are responsible for storing the
- * specified key i.e for replication.
- *
- * @param key - key for which we need to find the endpoint return value -
- * the endpoint responsible for this key
- */
- public Map<InetAddress, InetAddress> getHintedEndpointMap(String table,
String key, List<InetAddress> naturalEndpoints)
- {
- return
getReplicationStrategy(table).getHintedEndpoints(partitioner_.getToken(key),
table, naturalEndpoints);
- }
-
- /**
* This function finds the closest live endpoint that contains a given key.
*/
public InetAddress findSuitableEndPoint(String table, String key) throws
IOException, UnavailableException