Author: jbellis
Date: Wed Dec 16 21:28:55 2009
New Revision: 891430
URL: http://svn.apache.org/viewvc?rev=891430&view=rev
Log:
merge from 0.5
Modified:
incubator/cassandra/trunk/ (props changed)
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/NEWS.txt
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
(props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
(props changed)
incubator/cassandra/trunk/src/java/org/ (props changed)
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
incubator/cassandra/trunk/test/unit/org/ (props changed)
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5:888872-890627
+/incubator/cassandra/branches/cassandra-0.5:888872-891425
Modified: incubator/cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec 16 21:28:55 2009
@@ -2,6 +2,10 @@
* Fix potential NPE in get_range_slice (CASSANDRA-623)
* add CRC32 to commitlog entries (CASSANDRA-605)
* fix data streaming on windows (CASSANDRA-630)
+ * GC compacted sstables after cleanup and compaction (CASSANDRA-621)
+ * Speed up anti-entropy validation (CASSANDRA-629)
+ * Fix pending range conflicts when bootstapping or moving
+ multiple nodes at once (CASSANDRA-603)
0.5.0 beta 2
Modified: incubator/cassandra/trunk/NEWS.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/NEWS.txt?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
--- incubator/cassandra/trunk/NEWS.txt (original)
+++ incubator/cassandra/trunk/NEWS.txt Wed Dec 16 21:28:55 2009
@@ -8,6 +8,10 @@
out; if that happens, just go back to 0.4 and flush again.)
The format changed twice: from 0.4 to beta1, and from beta2 to RC1.
+.5 The gossip protocol has changed, meaning 0.5 nodes cannot coexist
+ in a cluster of 0.4 nodes or vice versa; you must upgrade your
+ whole cluster at the same time.
+
1. Bootstrap, move, load balancing, and active repair have been added.
See http://wiki.apache.org/cassandra/Operations. When upgrading
from 0.4, leave autobootstrap set to false for the first restart
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-891425
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,5 +1,5 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-891425
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-891425
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-891425
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588
Propchange:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,5 +1,5 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-891425
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588
Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-891425
/incubator/cassandra/trunk/src/java/org:749219-769885
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Wed Dec 16 21:28:55 2009
@@ -91,11 +91,11 @@
List<InetAddress> endpoints = new
ArrayList<InetAddress>(naturalEndpoints);
- for (Map.Entry<Range, InetAddress> entry :
tokenMetadata_.getPendingRanges().entrySet())
+ for (Map.Entry<Range, Collection<InetAddress>> entry :
tokenMetadata_.getPendingRanges().entrySet())
{
if (entry.getKey().contains(token))
{
- endpoints.add(entry.getValue());
+ endpoints.addAll(entry.getValue());
}
}
@@ -202,26 +202,9 @@
public Collection<Range> getPendingAddressRanges(TokenMetadata metadata,
Token pendingToken, InetAddress pendingAddress)
{
- TokenMetadata temp = metadata.cloneWithoutPending();
- temp.update(pendingToken, pendingAddress);
+ TokenMetadata temp = metadata.cloneOnlyTokenMap();
+ temp.updateNormalToken(pendingToken, pendingAddress);
return getAddressRanges(temp).get(pendingAddress);
}
- public void removeObsoletePendingRanges()
- {
- Multimap<InetAddress, Range> ranges = getAddressRanges();
- for (Map.Entry<Range, InetAddress> entry :
tokenMetadata_.getPendingRanges().entrySet())
- {
- for (Range currentRange : ranges.get(entry.getValue()))
- {
- if (currentRange.contains(entry.getKey()))
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Removing obsolete pending range " +
entry.getKey() + " from " + entry.getValue());
- tokenMetadata_.removePendingRange(entry.getKey());
- break;
- }
- }
- }
- }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
Wed Dec 16 21:28:55 2009
@@ -29,15 +29,33 @@
import org.apache.commons.lang.StringUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
private BiMap<Token, InetAddress> tokenToEndPointMap;
- private Map<Range, InetAddress> pendingRanges;
+
+ // Suppose that there is a ring of nodes A, C and E, with replication
factor 3.
+ // Node D bootstraps between C and E, so its pending ranges will be E-A,
A-C and C-D.
+ // Now suppose node B bootstraps between A and C at the same time. Its
pending ranges would be C-E, E-A and A-B.
+ // Now both nodes have pending range E-A in their list, which will cause
pending range collision
+ // even though we're only talking about replica range, not even primary
range. The same thing happens
+ // for any nodes that boot simultaneously between same two nodes. For this
we cannot simply make pending ranges a multimap,
+ // since that would make us unable to notice the real problem of two nodes
trying to boot using the same token.
+ // In order to do this properly, we need to know what tokens are booting
at any time.
+ private Map<Token, InetAddress> bootstrapTokens;
+
+ // we will need to know at all times what nodes are leaving and calculate
ranges accordingly.
+ // An anonymous pending ranges list is not enough, as that does not tell
which node is leaving
+ // and/or if the ranges are there because of bootstrap or leave operation.
+ // (See CASSANDRA-603 for more detail + examples).
+ private Set<InetAddress> leavingEndPoints;
+
+ private Multimap<Range, InetAddress> pendingRanges;
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -53,7 +71,9 @@
if (tokenToEndPointMap == null)
tokenToEndPointMap = HashBiMap.create();
this.tokenToEndPointMap = tokenToEndPointMap;
- pendingRanges = new NonBlockingHashMap<Range, InetAddress>();
+ bootstrapTokens = new HashMap<Token, InetAddress>();
+ leavingEndPoints = new HashSet<InetAddress>();
+ pendingRanges = HashMultimap.create();
sortedTokens = sortTokens();
}
@@ -69,18 +89,13 @@
{
int n = 0;
Range sourceRange = getPrimaryRangeFor(getToken(source));
- for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
- {
- if (sourceRange.contains(entry.getKey()) ||
entry.getValue().equals(source))
+ for (Token token : bootstrapTokens.keySet())
+ if (sourceRange.contains(token))
n++;
- }
return n;
}
- /**
- * Update the two maps in an safe mode.
- */
- public void update(Token token, InetAddress endpoint)
+ public void updateNormalToken(Token token, InetAddress endpoint)
{
assert token != null;
assert endpoint != null;
@@ -88,6 +103,8 @@
lock.writeLock().lock();
try
{
+ bootstrapTokens.remove(token);
+
tokenToEndPointMap.inverse().remove(endpoint);
if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
{
@@ -100,13 +117,49 @@
}
}
+ public void addBootstrapToken(Token token, InetAddress endpoint)
+ {
+ assert token != null;
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ InetAddress oldEndPoint = bootstrapTokens.get(token);
+ if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
+ throw new RuntimeException("Bootstrap Token collision between
" + oldEndPoint + " and " + endpoint + " (token " + token);
+ bootstrapTokens.put(token, endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void addLeavingEndPoint(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ leavingEndPoints.add(endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void removeEndpoint(InetAddress endpoint)
{
assert tokenToEndPointMap.containsValue(endpoint);
lock.writeLock().lock();
try
{
+ bootstrapTokens.remove(getToken(endpoint));
tokenToEndPointMap.inverse().remove(endpoint);
+ leavingEndPoints.remove(endpoint);
sortedTokens = sortTokens();
}
finally
@@ -161,7 +214,11 @@
}
}
- public TokenMetadata cloneWithoutPending()
+ /**
+ * Create a copy of TokenMetadata with only tokenToEndPointMap. That is,
pending ranges,
+ * bootstrap tokens and leaving endpoints are not included in the copy.
+ */
+ public TokenMetadata cloneOnlyTokenMap()
{
lock.readLock().lock();
try
@@ -174,28 +231,24 @@
}
}
- public String toString()
+ /**
+ * Create a copy of TokenMetadata with tokenToEndPointMap reflecting
situation after all
+ * current leave operations have finished.
+ */
+ public TokenMetadata cloneAfterAllLeft()
{
- StringBuilder sb = new StringBuilder();
lock.readLock().lock();
try
{
- Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
-
- for (InetAddress ep : eps)
- {
- sb.append(ep);
- sb.append(":");
- sb.append(tokenToEndPointMap.inverse().get(ep));
- sb.append(System.getProperty("line.separator"));
- }
+ TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
+ for (InetAddress endPoint : leavingEndPoints)
+ allLeftMetadata.removeEndpoint(endPoint);
+ return allLeftMetadata;
}
finally
{
lock.readLock().unlock();
}
-
- return sb.toString();
}
public InetAddress getEndPoint(Token token)
@@ -211,12 +264,6 @@
}
}
- public void clearUnsafe()
- {
- tokenToEndPointMap.clear();
- pendingRanges.clear();
- }
-
public Range getPrimaryRangeFor(Token right)
{
return new Range(getPredecessor(right), right);
@@ -235,29 +282,16 @@
}
}
- public void addPendingRange(Range range, InetAddress endpoint)
- {
- InetAddress oldEndpoint = pendingRanges.get(range);
- if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
- throw new RuntimeException("pending range collision between " +
oldEndpoint + " and " + endpoint);
- pendingRanges.put(range, endpoint);
- }
-
- public void removePendingRange(Range range)
- {
- pendingRanges.remove(range);
- }
-
/** a mutable map may be returned but caller should not modify it */
- public Map<Range, InetAddress> getPendingRanges()
+ public Map<Range, Collection<InetAddress>> getPendingRanges()
{
- return pendingRanges;
+ return pendingRanges.asMap();
}
public List<Range> getPendingRanges(InetAddress endpoint)
{
List<Range> ranges = new ArrayList<Range>();
- for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
+ for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
{
if (entry.getValue().equals(endpoint))
{
@@ -267,6 +301,11 @@
return ranges;
}
+ public void setPendingRanges(Multimap<Range, InetAddress> pendingRanges)
+ {
+ this.pendingRanges = pendingRanges;
+ }
+
public Token getPredecessor(Token token)
{
List tokens = sortedTokens();
@@ -288,8 +327,96 @@
return getEndPoint(getSuccessor(getToken(endPoint)));
}
- public void clearPendingRanges()
+ /** caller should not modify bootstrapTokens */
+ public Map<Token, InetAddress> getBootstrapTokens()
+ {
+ return bootstrapTokens;
+ }
+
+ /** caller should not modify leavigEndPoints */
+ public Set<InetAddress> getLeavingEndPoints()
{
+ return leavingEndPoints;
+ }
+
+ /** used by tests */
+ public void clearUnsafe()
+ {
+ bootstrapTokens.clear();
+ tokenToEndPointMap.clear();
+ leavingEndPoints.clear();
pendingRanges.clear();
}
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ lock.readLock().lock();
+ try
+ {
+ Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
+
+ if (!eps.isEmpty())
+ {
+ sb.append("Normal Tokens:");
+ sb.append(System.getProperty("line.separator"));
+ for (InetAddress ep : eps)
+ {
+ sb.append(ep);
+ sb.append(":");
+ sb.append(tokenToEndPointMap.inverse().get(ep));
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ if (!bootstrapTokens.isEmpty())
+ {
+ sb.append("Bootstrapping Tokens:" );
+ sb.append(System.getProperty("line.separator"));
+ for (Map.Entry<Token, InetAddress> entry :
bootstrapTokens.entrySet())
+ {
+ sb.append(entry.getValue() + ":" + entry.getKey());
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ if (!leavingEndPoints.isEmpty())
+ {
+ sb.append("Leaving EndPoints:");
+ sb.append(System.getProperty("line.separator"));
+ for (InetAddress ep : leavingEndPoints)
+ {
+ sb.append(ep);
+ sb.append(System.getProperty("line.separator"));
+ }
+ }
+
+ if (!pendingRanges.isEmpty())
+ {
+ sb.append("Pending Ranges:");
+ sb.append(System.getProperty("line.separator"));
+ sb.append(printPendingRanges());
+ }
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+ return sb.toString();
+ }
+
+ public String printPendingRanges()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ for (Map.Entry<Range, InetAddress> entry : pendingRanges.entries())
+ {
+ sb.append(entry.getValue() + ":" + entry.getKey());
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ return sb.toString();
+ }
+
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Dec 16 21:28:55 2009
@@ -183,7 +183,7 @@
if (logger_.isDebugEnabled())
logger_.debug("Setting token to " + token);
SystemTable.updateToken(token);
- tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+ tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
}
public StorageService()
@@ -306,7 +306,7 @@
{
SystemTable.setBootstrapped(true);
Token token = storageMetadata_.getToken();
- tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+ tokenMetadata_.updateNormalToken(token,
FBUtilities.getLocalAddress());
Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new
ApplicationState(partitioner_.getTokenFactory().toString(token)));
}
@@ -407,23 +407,25 @@
Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state bootstrapping, token " +
token);
- updateBootstrapRanges(token, endpoint);
+ tokenMetadata_.addBootstrapToken(token, endpoint);
+ calculatePendingRanges();
}
else if (STATE_NORMAL.equals(stateName))
{
Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state normal, token " + token);
- tokenMetadata_.update(token, endpoint);
+ tokenMetadata_.updateNormalToken(token, endpoint);
+ calculatePendingRanges();
if (!isClientMode)
SystemTable.updateToken(endpoint, token);
- replicationStrategy_.removeObsoletePendingRanges();
}
else if (STATE_LEAVING.equals(stateName))
{
Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
assert tokenMetadata_.getToken(endpoint).equals(token);
- updateLeavingRanges(endpoint);
+ tokenMetadata_.addLeavingEndPoint(endpoint);
+ calculatePendingRanges();
}
else if (STATE_LEFT.equals(stateName))
{
@@ -442,6 +444,7 @@
logger_.debug(endpoint + " state left, token " +
token);
assert tokenMetadata_.getToken(endpoint).equals(token);
tokenMetadata_.removeEndpoint(endpoint);
+ calculatePendingRanges();
}
}
else
@@ -454,11 +457,94 @@
{
restoreReplicaCount(endPointThatLeft);
tokenMetadata_.removeEndpoint(endPointThatLeft);
+ calculatePendingRanges();
}
}
+ }
+ }
+
+ /**
+ * Calculate pending ranges according to bootsrapping and leaving nodes.
Reasoning is:
+ *
+ * (1) When in doubt, it is better to write too much to a node than too
little. That is, if
+ * there are multiple nodes moving, calculate the biggest ranges a node
could have. Cleaning
+ * up unneeded data afterwards is better than missing writes during
movement.
+ * (2) When a node leaves, ranges for other nodes can only grow (a node
might get additional
+ * ranges, but it will not lose any of its current ranges as a result of a
leave). Therefore
+ * we will first remove _all_ leaving tokens for the sake of calculation
and then check what
+ * ranges would go where if all nodes are to leave. This way we get the
biggest possible
+ * ranges with regard current leave operations, covering all subsets of
possible final range
+ * values.
+ * (3) When a node bootstraps, ranges of other nodes can only get smaller.
Without doing
+ * complex calculations to see if multiple bootstraps overlap, we simply
base calculations
+ * on the same token ring used before (reflecting situation after all
leave operations have
+ * completed). Bootstrapping nodes will be added and removed one by one to
that metadata and
+ * checked what their ranges would be. This will give us the biggest
possible ranges the
+ * node could have. It might be that other bootstraps make our actual
final ranges smaller,
+ * but it does not matter as we can clean up the data afterwards.
+ *
+ * NOTE: This is heavy and ineffective operation. This will be done only
once when a node
+ * changes state in the cluster, so it should be manageable.
+ */
+ private void calculatePendingRanges()
+ {
+ calculatePendingRanges(tokenMetadata_, replicationStrategy_);
+ }
+
+ // public & static for testing purposes
+ public static void calculatePendingRanges(TokenMetadata tm,
AbstractReplicationStrategy strategy)
+ {
+ Multimap<Range, InetAddress> pendingRanges = HashMultimap.create();
+ Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
+ Set<InetAddress> leavingEndPoints = tm.getLeavingEndPoints();
+
+ if (bootstrapTokens.isEmpty() && leavingEndPoints.isEmpty())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("No bootstrapping or leaving nodes -> empty
pending ranges");
+ tm.setPendingRanges(pendingRanges);
+ return;
+ }
+
+ Multimap<InetAddress, Range> addressRanges =
strategy.getAddressRanges();
+
+ // Copy of metadata reflecting the situation after all leave
operations are finished.
+ TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
+
+ // get all ranges that will be affected by leaving nodes
+ Set<Range> affectedRanges = new HashSet<Range>();
+ for (InetAddress endPoint : leavingEndPoints)
+ affectedRanges.addAll(addressRanges.get(endPoint));
+
+ // for each of those ranges, find what new nodes will be responsible
for the range when
+ // all leaving nodes are gone.
+ for (Range range : affectedRanges)
+ {
+ List<InetAddress> currentEndPoints =
strategy.getNaturalEndpoints(range.right(), tm);
+ List<InetAddress> newEndPoints =
strategy.getNaturalEndpoints(range.right(), allLeftMetadata);
+ newEndPoints.removeAll(currentEndPoints);
+ pendingRanges.putAll(range, newEndPoints);
+ }
+
+ // At this stage pendingRanges has been updated according to leave
operations. We can
+ // now finish the calculation by checking bootstrapping nodes.
+
+ // For each of the bootstrapping nodes, simply add and remove them one
by one to
+ // allLeftMetadata and check in between what their ranges would be.
+ for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+ {
+ InetAddress endPoint = entry.getValue();
- replicationStrategy_.removeObsoletePendingRanges();
+ allLeftMetadata.updateNormalToken(entry.getKey(), endPoint);
+ for (Range range :
strategy.getAddressRanges(allLeftMetadata).get(endPoint))
+ pendingRanges.put(range, endPoint);
+ allLeftMetadata.removeEndpoint(endPoint);
}
+
+ tm.setPendingRanges(pendingRanges);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Pending ranges:\n" + tm.printPendingRanges());
}
/**
@@ -534,7 +620,7 @@
Collection<Range> ranges = getRangesForEndPoint(endpoint);
if (logger_.isDebugEnabled())
- logger_.debug("leaving node ranges are [" +
StringUtils.join(ranges, ", ") + "]");
+ logger_.debug("Node " + endpoint + " ranges [" +
StringUtils.join(ranges, ", ") + "]");
Map<Range, ArrayList<InetAddress>> currentReplicaEndpoints = new
HashMap<Range, ArrayList<InetAddress>>();
@@ -542,7 +628,7 @@
for (Range range : ranges)
currentReplicaEndpoints.put(range,
replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_));
- TokenMetadata temp = tokenMetadata_.cloneWithoutPending();
+ TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
temp.removeEndpoint(endpoint);
Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
@@ -557,43 +643,13 @@
ArrayList<InetAddress> newReplicaEndpoints =
replicationStrategy_.getNaturalEndpoints(range.right(), temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
- logger_.debug("adding pending range " + range + " to endpoints
" + StringUtils.join(newReplicaEndpoints, ", "));
+ logger_.debug("Range " + range + " will be responsibility of "
+ StringUtils.join(newReplicaEndpoints, ", "));
changedRanges.putAll(range, newReplicaEndpoints);
}
return changedRanges;
}
- private void updateLeavingRanges(final InetAddress endpoint)
- {
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " is leaving; calculating pendingranges");
- Multimap<Range, InetAddress> ranges =
getChangedRangesForLeaving(endpoint);
- for (Range range : ranges.keySet())
- {
- for (InetAddress newEndpoint : ranges.get(range))
- {
- tokenMetadata_.addPendingRange(range, newEndpoint);
- }
- }
- }
-
- private void updateBootstrapRanges(Token token, InetAddress endpoint)
- {
- for (Range range :
replicationStrategy_.getPendingAddressRanges(tokenMetadata_, token, endpoint))
- {
- tokenMetadata_.addPendingRange(range, endpoint);
- }
- }
-
- public static void updateBootstrapRanges(AbstractReplicationStrategy
strategy, TokenMetadata metadata, Token token, InetAddress endpoint)
- {
- for (Range range : strategy.getPendingAddressRanges(metadata, token,
endpoint))
- {
- metadata.addPendingRange(range, endpoint);
- }
- }
-
public void onJoin(InetAddress endpoint, EndPointState epState)
{
for (Map.Entry<String,ApplicationState> entry :
epState.getSortedApplicationStates())
@@ -1117,7 +1173,6 @@
{
SystemTable.setBootstrapped(false);
tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
- replicationStrategy_.removeObsoletePendingRanges();
if (logger_.isDebugEnabled())
logger_.debug("");
@@ -1238,7 +1293,6 @@
restoreReplicaCount(endPoint);
tokenMetadata_.removeEndpoint(endPoint);
- replicationStrategy_.removeObsoletePendingRanges();
}
// This is not the cleanest way as we're adding STATE_LEFT for
@@ -1261,11 +1315,6 @@
return replicationStrategy_;
}
- public void cancelPendingRanges()
- {
- tokenMetadata_.clearPendingRanges();
- }
-
public boolean isClientMode()
{
return isClientMode;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Wed Dec 16 21:28:55 2009
@@ -141,13 +141,6 @@
public void loadBalance() throws IOException, InterruptedException;
/**
- * cancel writes to nodes that are set to be changing ranges.
- * Only do this if the reason for the range changes no longer exists
- * (e.g., a bootstrapping node was killed or crashed.)
- */
- public void cancelPendingRanges();
-
- /**
* removeToken removes token (and all data associated with
* enpoint that had it) from the ring
*/
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Wed Dec 16 21:28:55 2009
@@ -398,11 +398,6 @@
ssProxy.move(newToken);
}
- public void cancelPendingRanges()
- {
- ssProxy.cancelPendingRanges();
- }
-
public void removeToken(String token)
{
ssProxy.removeToken(token);
@@ -503,7 +498,7 @@
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, cleanup, compact, cfstats,
snapshot [name], clearsnapshot, " +
- "tpstats, flush, repair, decommission, move, loadbalance,
cancelpending, removetoken, " +
+ "tpstats, flush, repair, decommission, move, loadbalance,
removetoken, " +
" getcompactionthreshold, setcompactionthreshold
[minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n",
NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -578,10 +573,6 @@
}
probe.move(arguments[1]);
}
- else if (cmdName.equals("cancelpending"))
- {
- probe.cancelPendingRanges();
- }
else if (cmdName.equals("removetoken"))
{
if (arguments.length <= 1)
Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 16 21:28:55 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-890627
+/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-891425
/incubator/cassandra/trunk/test/unit/org:749219-768583
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Wed Dec 16 21:28:55 2009
@@ -32,6 +32,7 @@
import com.google.common.collect.Multimap;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -61,7 +62,8 @@
Range range3 = ss.getPrimaryRangeForEndPoint(three);
Token fakeToken =
((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(),
range3.right());
assert range3.contains(fakeToken);
-
StorageService.updateBootstrapRanges(StorageService.instance().getReplicationStrategy(),
tmd, fakeToken, myEndpoint);
+ ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new
ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken)));
+ tmd = ss.getTokenMetadata();
InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
assert two.equals(source2) : source2;
@@ -124,7 +126,7 @@
for (int i = 1; i <= numOldNodes; i++)
{
// leave .1 for myEndpoint
- tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." +
(i + 1)));
+ tmd.updateNormalToken(p.getRandomToken(),
InetAddress.getByName("127.0.0." + (i + 1)));
}
}
}
\ No newline at end of file
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=891430&r1=891429&r2=891430&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Wed Dec 16 21:28:55 2009
@@ -79,7 +79,7 @@
for (int i = 0; i < endPointTokens.length; i++)
{
InetAddress ep = InetAddress.getByName("127.0.0." +
String.valueOf(i + 1));
- tmd.update(endPointTokens[i], ep);
+ tmd.updateNormalToken(endPointTokens[i], ep);
hosts.add(ep);
}
@@ -114,15 +114,16 @@
for (int i = 0; i < endPointTokens.length; i++)
{
InetAddress ep = InetAddress.getByName("127.0.0." +
String.valueOf(i + 1));
- tmd.update(endPointTokens[i], ep);
+ tmd.updateNormalToken(endPointTokens[i], ep);
hosts.add(ep);
}
//Add bootstrap node id=6
Token bsToken = new BigIntegerToken(String.valueOf(25));
InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
- StorageService.updateBootstrapRanges(strategy, tmd, bsToken,
bootstrapEndPoint);
-
+ tmd.addBootstrapToken(bsToken, bootstrapEndPoint);
+ StorageService.calculatePendingRanges(tmd, strategy);
+
for (int i = 0; i < keyTokens.length; i++)
{
Collection<InetAddress> endPoints =
strategy.getWriteEndpoints(keyTokens[i],
strategy.getNaturalEndpoints(keyTokens[i]));
@@ -136,6 +137,8 @@
// for 5, 15, 25 this should include bootstrap node
if (i < 3)
assertTrue(endPoints.contains(bootstrapEndPoint));
+ else
+ assertFalse(endPoints.contains(bootstrapEndPoint));
}
}
}