Author: jbellis
Date: Mon Apr 5 21:01:52 2010
New Revision: 930960
URL: http://svn.apache.org/viewvc?rev=930960&view=rev
Log:
fix neighbor calculation for anti-entropy repair. patch by Stu Hood; reviewed
by jbellis for CASSANDRA-924
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Apr 5 21:01:52 2010
@@ -6,6 +6,7 @@
* fix command line arguments inversion in clustertool (CASSANDRA-942)
* fix race condition that could trigger a false-positive assertion
during post-flush discard of old commitlog segments (CASSANDRA-936)
+ * fix neighbor calculation for anti-entropy repair (CASSANDRA-924)
0.6.0-RC1
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Mon Apr 5 21:01:52 2010
@@ -43,9 +43,6 @@ import org.apache.cassandra.utils.*;
import org.apache.log4j.Logger;
-import com.google.common.collect.Collections2;
-import com.google.common.base.Predicates;
-
/**
* AntiEntropyService encapsulates "validating" (hashing) individual column
families,
* exchanging MerkleTrees with remote nodes via a TreeRequest/Response
conversation,
@@ -141,12 +138,18 @@ public class AntiEntropyService
/**
* Return all of the neighbors with whom we share data.
*/
- private static Collection<InetAddress> getNeighbors(String table)
+ public static Set<InetAddress> getNeighbors(String table)
{
- InetAddress local = FBUtilities.getLocalAddress();
StorageService ss = StorageService.instance;
- return Collections2.filter(ss.getNaturalEndpoints(table,
ss.getLocalToken()),
- Predicates.not(Predicates.equalTo(local)));
+ Set<InetAddress> neighbors = new HashSet<InetAddress>();
+ Map<Range, List<InetAddress>> replicaSets =
ss.getRangeToAddressMap(table);
+ for (Range range : ss.getLocalRanges(table))
+ {
+ // for every range stored locally (replica or original) collect
neighbors storing copies
+ neighbors.addAll(replicaSets.get(range));
+ }
+ neighbors.remove(FBUtilities.getLocalAddress());
+ return neighbors;
}
/**
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Mon Apr 5 21:01:52 2010
@@ -1063,7 +1063,8 @@ public class StorageService implements I
{
// request that all relevant endpoints generate trees
final MessagingService ms = MessagingService.instance;
- final List<InetAddress> endpoints = getNaturalEndpoints(tableName,
getLocalToken());
+ final Set<InetAddress> endpoints =
AntiEntropyService.getNeighbors(tableName);
+ endpoints.add(FBUtilities.getLocalAddress());
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
{
Message request = TreeRequestVerbHandler.makeVerb(tableName,
cfStore.getColumnFamilyName());
Modified:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Mon Apr 5 21:01:52 2010
@@ -31,6 +31,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import static org.apache.cassandra.service.AntiEntropyService.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -41,40 +42,39 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.Util;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class AntiEntropyServiceTest extends CleanupHelper
{
// table and column family to test against
- public AntiEntropyService aes;
+ public static AntiEntropyService aes;
public static String tablename;
public static String cfname;
public static InetAddress LOCAL, REMOTE;
- private static boolean initialized;
+ @BeforeClass
+ public static void prepareClass() throws Exception
+ {
+ LOCAL = FBUtilities.getLocalAddress();
+ tablename = "Keyspace4";
+ StorageService.instance.initServer();
+ // generate a fake endpoint for which we can spoof receiving/sending
trees
+ REMOTE = InetAddress.getByName("127.0.0.2");
+ cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+ }
@Before
public void prepare() throws Exception
{
- if (!initialized)
- {
- LOCAL = FBUtilities.getLocalAddress();
- tablename = "Keyspace4";
-
- StorageService.instance.initServer();
- // generate a fake endpoint for which we can spoof
receiving/sending trees
- TokenMetadata tmd = StorageService.instance.getTokenMetadata();
- IPartitioner part = StorageService.getPartitioner();
- REMOTE = InetAddress.getByName("127.0.0.2");
- tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
- assert tmd.isMember(REMOTE);
-
- cfname =
Table.open(tablename).getColumnFamilies().iterator().next();
- initialized = true;
- }
aes = AntiEntropyService.instance;
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), LOCAL);
+
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(),
REMOTE);
+ assert tmd.isMember(REMOTE);
}
@Test
@@ -202,6 +202,32 @@ public class AntiEntropyServiceTest exte
}
@Test
+ public void testGetNeighborsPlusOne() throws Throwable
+ {
+ // generate rf+1 nodes, and ensure that all nodes are returned
+ Set<InetAddress> expected = addTokens(1 + 1 +
DatabaseDescriptor.getReplicationFactor(tablename));
+ expected.remove(FBUtilities.getLocalAddress());
+ assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+ }
+
+ @Test
+ public void testGetNeighborsTimesTwo() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by
the ARS are returned
+ addTokens(1 + (2 *
DatabaseDescriptor.getReplicationFactor(tablename)));
+ AbstractReplicationStrategy ars =
StorageService.instance.getReplicationStrategy(tablename);
+ Set<InetAddress> expected = new HashSet<InetAddress>();
+ for (Range replicaRange :
ars.getAddressRanges(tablename).get(FBUtilities.getLocalAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd,
tablename).get(replicaRange));
+ }
+ expected.remove(FBUtilities.getLocalAddress());
+ assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+ }
+
+ @Test
public void testDifferencer() throws Throwable
{
// generate a tree
@@ -232,6 +258,19 @@ public class AntiEntropyServiceTest exte
assertEquals("Wrong differing range", changed,
diff.differences.get(0));
}
+ Set<InetAddress> addTokens(int max) throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ Set<InetAddress> endpoints = new HashSet<InetAddress>();
+ for (int i = 1; i < max; i++)
+ {
+ InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(),
endpoint);
+ endpoints.add(endpoint);
+ }
+ return endpoints;
+ }
+
Future<Object> flushAES()
{
return StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(new
Callable<Object>()