Author: jbellis
Date: Wed Aug 26 19:04:07 2009
New Revision: 808161

URL: http://svn.apache.org/viewvc?rev=808161&view=rev
Log:
1. Switch bootstrapNodes in TokenMetadata to Map<Token,EndPoint> so 
RackUnawareStrategy can use it.
2. Fix AbstractStrategy and RackUnawareStrategy to incorporate the 
bootstrapping nodes for
getHintedStorageEndPoints through StorageService.getNStorageEndPointMap (now 
used by insert and
insertBlocking after 383)
3. Add unit test fot RackUnawareStrategy to test if bootstrapping nodes are 
being returned correctly.

patch by Sandeep Tata; reviewed by jbellis for CASSANDRA-375

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
 Wed Aug 26 19:04:07 2009
@@ -102,9 +102,14 @@
      */
     public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
     {
+        EndPoint[] topN = getStorageEndPointsForWrite( token );
+        return getHintedMapForEndpoints(topN);
+    }
+    
+    private Map<EndPoint, EndPoint> getHintedMapForEndpoints(EndPoint[] topN)
+    {
         List<EndPoint> liveList = new ArrayList<EndPoint>();
         Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
-        EndPoint[] topN = getStorageEndPoints( token );
 
         for( int i = 0 ; i < topN.length ; i++)
         {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
 Wed Aug 26 19:04:07 2009
@@ -33,6 +33,7 @@
 public interface IReplicaPlacementStrategy
 {
        public EndPoint[] getStorageEndPoints(Token token);
+       public EndPoint[] getStorageEndPointsForWrite(Token token);
     public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
     public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> 
tokenToEndPointMap);
     public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token);

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
 Wed Aug 26 19:04:07 2009
@@ -139,4 +139,15 @@
     {
         throw new UnsupportedOperationException("This operation is not 
currently supported");
     }
+
+    public EndPoint[] getStorageEndPointsForWrite(Token token)
+    {
+        throw new UnsupportedOperationException("Rack-aware bootstrapping not 
supported");
+    }
+
+    
+    public Map<EndPoint, EndPoint> getHintedStorageEndPointsForWrite(Token 
token)
+    {
+        throw new UnsupportedOperationException("Rack-aware bootstrapping not 
supported");
+    }
 }
\ No newline at end of file

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
 Wed Aug 26 19:04:07 2009
@@ -46,12 +46,47 @@
         return getStorageEndPoints(token, 
tokenMetadata_.cloneTokenEndPointMap());            
     }
     
+    public EndPoint[] getStorageEndPointsForWrite(Token token)
+    {
+        Map<Token, EndPoint> tokenToEndPointMap = 
tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, EndPoint> bootstrapTokensToEndpointMap = 
tokenMetadata_.cloneBootstrapNodes();
+        List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, 
bootstrapTokensToEndpointMap);
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        for (Token t: tokenList)
+        {
+            EndPoint e = tokenToEndPointMap.get(t);
+            if (e == null) 
+                e = bootstrapTokensToEndpointMap.get(t); 
+            assert e != null;
+            list.add(e);
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[list.size()]);            
+    }
+    
     public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> 
tokenToEndPointMap)
     {
-        int startIndex;
+        List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, 
null);
         List<EndPoint> list = new ArrayList<EndPoint>();
+        for (Token t: tokenList)
+            list.add(tokenToEndPointMap.get(t));
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[list.size()]);
+    }
+
+    private List<Token> getStorageTokens(Token token, Map<Token, EndPoint> 
tokenToEndPointMap, Map<Token, EndPoint> bootStrapTokenToEndPointMap)
+    {
+        int startIndex;
+        List<Token> tokenList = new ArrayList<Token>();
         int foundCount = 0;
         List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        List<Token> bsTokens = null;
+        
+        if (bootStrapTokenToEndPointMap != null)
+        {
+            bsTokens = new 
ArrayList<Token>(bootStrapTokenToEndPointMap.keySet());
+            tokens.addAll(bsTokens);
+        }
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
@@ -61,22 +96,24 @@
                 index = 0;
         }
         int totalNodes = tokens.size();
-        // Add the node at the index by default
-        list.add(tokenToEndPointMap.get(tokens.get(index)));
-        foundCount++;
+        // Add the token at the index by default
+        tokenList.add((Token)tokens.get(index));
+        if (bsTokens == null || !bsTokens.contains(tokens.get(index)))
+            foundCount++;
         startIndex = (index + 1)%totalNodes;
         // If we found N number of nodes we are good. This loop will just 
exit. Otherwise just
         // loop through the list and add until we have N nodes.
         for (int i = startIndex, count = 1; count < totalNodes && foundCount < 
replicas_; ++count, i = (i+1)%totalNodes)
         {
-            if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+            if(!tokenList.contains(tokens.get(i)))
             {
-                list.add(tokenToEndPointMap.get(tokens.get(i)));
-                foundCount++;
+                tokenList.add((Token)tokens.get(i));
+                //Don't count bootstrapping tokens towards the count
+                if (bsTokens==null || !bsTokens.contains(tokens.get(i)))
+                    foundCount++;
             }
         }
-        retrofitPorts(list);
-        return list.toArray(new EndPoint[list.size()]);
+        return tokenList;
     }
             
     public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
@@ -87,7 +124,6 @@
         {
             results.put(key, getStorageEndPoints(partitioner_.getToken(key)));
         }
-
         return results;
     }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
 Wed Aug 26 19:04:07 2009
@@ -32,7 +32,7 @@
     /* Maintains a reverse index of endpoint to token in the cluster. */
     private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint, 
Token>();
     /* Bootstrapping nodes and their tokens */
-    private Map<EndPoint, Token> bootstrapNodes = 
Collections.synchronizedMap(new HashMap<EndPoint, Token>());
+    private Map<Token, EndPoint> bootstrapNodes = 
Collections.synchronizedMap(new HashMap<Token, EndPoint>());
     
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -40,19 +40,19 @@
     public TokenMetadata()
     {
     }
-    
-    public TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, 
Map<EndPoint, Token> endPointToTokenMap, Map<EndPoint, Token> bootstrapNodes)
+
+    public TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, 
Map<EndPoint, Token> endPointToTokenMap, Map<Token, EndPoint> bootstrapNodes)
     {
         tokenToEndPointMap_ = tokenToEndPointMap;
         endPointToTokenMap_ = endPointToTokenMap;
-        this.bootstrapNodes = bootstrapNodes;
+        this.bootstrapNodes = bootstrapNodes; 
     }
     
     public TokenMetadata cloneMe()
     {
         return new TokenMetadata(cloneTokenEndPointMap(), 
cloneEndPointTokenMap(), cloneBootstrapNodes());
     }
-    
+        
     public void update(Token token, EndPoint endpoint)
     {
         this.update(token, endpoint, false);
@@ -67,12 +67,12 @@
         {
             if (bootstrapState)
             {
-                bootstrapNodes.put(endpoint, token);
+                bootstrapNodes.put(token, endpoint);
                 this.remove(endpoint);
             }
             else
             {
-                bootstrapNodes.remove(endpoint); // If this happened to be 
there 
+                bootstrapNodes.remove(token); // If this happened to be there 
                 Token oldToken = endPointToTokenMap_.get(endpoint);
                 if ( oldToken != null )
                     tokenToEndPointMap_.remove(oldToken);
@@ -168,12 +168,12 @@
         }
     }
     
-    public Map<EndPoint, Token> cloneBootstrapNodes()
+    public Map<Token, EndPoint> cloneBootstrapNodes()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<EndPoint, Token>( bootstrapNodes );
+            return new HashMap<Token, EndPoint>( bootstrapNodes );
         }
         finally
         {

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
 Wed Aug 26 19:04:07 2009
@@ -18,6 +18,9 @@
 */
 package org.apache.cassandra.locator;
 
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
 import java.util.List;
 import java.util.ArrayList;
 
@@ -79,11 +82,57 @@
         for (int i = 0; i < keyTokens.length; i++)
         {
             EndPoint[] endPoints = strategy.getStorageEndPoints(keyTokens[i]);
-            assert endPoints.length == 3;
+            assertEquals(3, endPoints.length);
             for (int j = 0; j < endPoints.length; j++)
             {
-                assert endPoints[j] == hosts.get((i + j + 1) % hosts.size());
+                assertEquals(endPoints[j], hosts.get((i + j + 1) % 
hosts.size()));
+            }
+        }
+    }
+    
+    @Test
+    public void testGetStorageEndPointsDuringBootstrap()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        IPartitioner partitioner = new RandomPartitioner();
+        IReplicaPlacementStrategy strategy = new RackUnawareStrategy(tmd, 
partitioner, 3, 7000);
+
+        Token[] endPointTokens = new Token[5]; 
+        Token[] keyTokens = new Token[5];
+        
+        for (int i = 0; i < 5; i++) 
+        {
+            endPointTokens[i] = new BigIntegerToken(String.valueOf(10 * i));
+            keyTokens[i] = new BigIntegerToken(String.valueOf(10 * i + 5));
+        }
+        
+        List<EndPoint> hosts = new ArrayList<EndPoint>();
+        for (int i = 0; i < endPointTokens.length; i++)
+        {
+            EndPoint ep = new EndPoint("127.0.0." + String.valueOf(i + 1), 
7001);
+            tmd.update(endPointTokens[i], ep);
+            hosts.add(ep);
+        }
+        
+        //Add bootstrap node id=6
+        Token bsToken = new BigIntegerToken(String.valueOf(25));
+        EndPoint bootstrapEndPoint = new EndPoint("127.0.0.6", 7001);
+        tmd.update(bsToken, bootstrapEndPoint, true);
+        
+        for (int i = 0; i < keyTokens.length; i++)
+        {
+            EndPoint[] endPoints = 
strategy.getStorageEndPointsForWrite(keyTokens[i]);
+            assertTrue(endPoints.length >=3);
+            List<EndPoint> endPointsList = Arrays.asList(endPoints);
+
+            for (int j = 0; j < 3; j++)
+            {
+                //Check that the old nodes are definitely included
+                assertTrue(endPointsList.contains(hosts.get((i + j + 1) % 
hosts.size())));   
             }
+            // for 5, 15, 25 this should include bootstrap node
+            if (i < 3)
+                assertTrue(endPointsList.contains(bootstrapEndPoint));
         }
     }
 }


Reply via email to