Author: jbellis
Date: Mon Apr  5 21:01:52 2010
New Revision: 930960

URL: http://svn.apache.org/viewvc?rev=930960&view=rev
Log:
fix neighbor calculation for anti-entropy repair.  patch by Stu Hood; reviewed 
by jbellis for CASSANDRA-924

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Apr  5 21:01:52 2010
@@ -6,6 +6,7 @@
  * fix command line arguments inversion in clustertool (CASSANDRA-942)
  * fix race condition that could trigger a false-positive assertion
    during post-flush discard of old commitlog segments (CASSANDRA-936)
+ * fix neighbor calculation for anti-entropy repair (CASSANDRA-924)
 
 
 0.6.0-RC1

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Mon Apr  5 21:01:52 2010
@@ -43,9 +43,6 @@ import org.apache.cassandra.utils.*;
 
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Collections2;
-import com.google.common.base.Predicates;
-
 /**
  * AntiEntropyService encapsulates "validating" (hashing) individual column 
families,
  * exchanging MerkleTrees with remote nodes via a TreeRequest/Response 
conversation,
@@ -141,12 +138,18 @@ public class AntiEntropyService
     /**
      * Return all of the neighbors with whom we share data.
      */
-    private static Collection<InetAddress> getNeighbors(String table)
+    public static Set<InetAddress> getNeighbors(String table)
     {
-        InetAddress local = FBUtilities.getLocalAddress();
         StorageService ss = StorageService.instance;
-        return Collections2.filter(ss.getNaturalEndpoints(table, 
ss.getLocalToken()),
-                                   Predicates.not(Predicates.equalTo(local)));
+        Set<InetAddress> neighbors = new HashSet<InetAddress>();
+        Map<Range, List<InetAddress>> replicaSets = 
ss.getRangeToAddressMap(table);
+        for (Range range : ss.getLocalRanges(table))
+        {
+            // for every range stored locally (replica or original) collect 
neighbors storing copies
+            neighbors.addAll(replicaSets.get(range));
+        }
+        neighbors.remove(FBUtilities.getLocalAddress());
+        return neighbors;
     }
 
     /**

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 Mon Apr  5 21:01:52 2010
@@ -1063,7 +1063,8 @@ public class StorageService implements I
     {
         // request that all relevant endpoints generate trees
         final MessagingService ms = MessagingService.instance;
-        final List<InetAddress> endpoints = getNaturalEndpoints(tableName, 
getLocalToken());
+        final Set<InetAddress> endpoints = 
AntiEntropyService.getNeighbors(tableName);
+        endpoints.add(FBUtilities.getLocalAddress());
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
         {
             Message request = TreeRequestVerbHandler.makeVerb(tableName, 
cfStore.getColumnFamilyName());

Modified: 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=930960&r1=930959&r2=930960&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 Mon Apr  5 21:01:52 2010
@@ -31,6 +31,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.CompactionIterator.CompactedRow;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import static org.apache.cassandra.service.AntiEntropyService.*;
 import org.apache.cassandra.utils.FBUtilities;
@@ -41,40 +42,39 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.Util;
 
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class AntiEntropyServiceTest extends CleanupHelper
 {
     // table and column family to test against
-    public AntiEntropyService aes;
+    public static AntiEntropyService aes;
 
     public static String tablename;
     public static String cfname;
     public static InetAddress LOCAL, REMOTE;
 
-    private static boolean initialized;
+    @BeforeClass
+    public static void prepareClass() throws Exception
+    {
+        LOCAL = FBUtilities.getLocalAddress();
+        tablename = "Keyspace4";
+        StorageService.instance.initServer();
+        // generate a fake endpoint for which we can spoof receiving/sending 
trees
+        REMOTE = InetAddress.getByName("127.0.0.2");
+        cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+    }
 
     @Before
     public void prepare() throws Exception
     {
-        if (!initialized)
-        {
-            LOCAL = FBUtilities.getLocalAddress();
-            tablename = "Keyspace4";
-
-            StorageService.instance.initServer();
-            // generate a fake endpoint for which we can spoof 
receiving/sending trees
-            TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-            IPartitioner part = StorageService.getPartitioner();
-            REMOTE = InetAddress.getByName("127.0.0.2");
-            tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
-            assert tmd.isMember(REMOTE);
-
-            cfname = 
Table.open(tablename).getColumnFamilies().iterator().next();
-            initialized = true;
-        }
         aes = AntiEntropyService.instance;
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), LOCAL);
+        
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), 
REMOTE);
+        assert tmd.isMember(REMOTE);
     }
 
     @Test
@@ -202,6 +202,32 @@ public class AntiEntropyServiceTest exte
     }
 
     @Test
+    public void testGetNeighborsPlusOne() throws Throwable
+    {
+        // generate rf+1 nodes, and ensure that all nodes are returned
+        Set<InetAddress> expected = addTokens(1 + 1 + 
DatabaseDescriptor.getReplicationFactor(tablename));
+        expected.remove(FBUtilities.getLocalAddress());
+        assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+    }
+
+    @Test
+    public void testGetNeighborsTimesTwo() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
+        addTokens(1 + (2 * 
DatabaseDescriptor.getReplicationFactor(tablename)));
+        AbstractReplicationStrategy ars = 
StorageService.instance.getReplicationStrategy(tablename);
+        Set<InetAddress> expected = new HashSet<InetAddress>();
+        for (Range replicaRange : 
ars.getAddressRanges(tablename).get(FBUtilities.getLocalAddress()))
+        {
+            expected.addAll(ars.getRangeAddresses(tmd, 
tablename).get(replicaRange));
+        }
+        expected.remove(FBUtilities.getLocalAddress());
+        assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+    }
+
+    @Test
     public void testDifferencer() throws Throwable
     {
         // generate a tree
@@ -232,6 +258,19 @@ public class AntiEntropyServiceTest exte
         assertEquals("Wrong differing range", changed, 
diff.differences.get(0));
     }
 
+    Set<InetAddress> addTokens(int max) throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        Set<InetAddress> endpoints = new HashSet<InetAddress>();
+        for (int i = 1; i < max; i++)
+        {
+            InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+            
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), 
endpoint);
+            endpoints.add(endpoint);
+        }
+        return endpoints;
+    }
+
     Future<Object> flushAES()
     {
         return StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(new 
Callable<Object>()


Reply via email to