Updated Branches:
  refs/heads/cassandra-1.2 4b2923a46 -> debd8f017

Fix primary range ignores replication strategy patch by yukim; reviewed by 
jbellis for CASSANDRA-5424


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/debd8f01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/debd8f01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/debd8f01

Branch: refs/heads/cassandra-1.2
Commit: debd8f0176038f6933978eb9b4f8ee6adc07b541
Parents: 4b2923a
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Apr 15 22:01:37 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Apr 19 16:31:22 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/locator/NetworkTopologyStrategy.java |    3 +-
 .../apache/cassandra/service/StorageService.java   |   46 +++-
 .../service/AntiEntropyServiceTestAbstract.java    |    2 +-
 .../service/StorageServiceServerTest.java          |  187 ++++++++++++++-
 5 files changed, 223 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8c4cced..eff7c49 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
  * Fix preparing statements when current keyspace is not set (CASSANDRA-5468)
  * Fix SemanticVersion.isSupportedBy minor/patch handling (CASSANDRA-5496)
  * Don't provide oldCfId for post-1.1 system cfs (CASSANDRA-5490)
+ * Fix primary range ignores replication strategy (CASSANDRA-5424)
 Merged from 1.1
  * Add retry mechanism to OTC for non-droppable_verbs (CASSANDRA-5393)
  * Use allocator information to improve memtable memory usage estimate 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java 
b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 91478e8..d354019 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -78,7 +78,8 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
     @SuppressWarnings("serial")
     public List<InetAddress> calculateNaturalEndpoints(Token searchToken, 
TokenMetadata tokenMetadata)
     {
-        Set<InetAddress> replicas = new HashSet<InetAddress>();
+        // we want to preserve insertion order so that the first added 
endpoint becomes primary
+        Set<InetAddress> replicas = new LinkedHashSet<InetAddress>();
         // replicas we have found in each DC
         Map<String, Set<InetAddress>> dcReplicas = new HashMap<String, 
Set<InetAddress>>(datacenters.size())
         {{

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 658f583..b7bf6f4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -145,9 +145,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress());
     }
 
-    public Collection<Range<Token>> getLocalPrimaryRanges()
+    public Collection<Range<Token>> getLocalPrimaryRanges(String keyspace)
     {
-        return getPrimaryRangesForEndpoint(FBUtilities.getBroadcastAddress());
+        return getPrimaryRangesForEndpoint(keyspace, 
FBUtilities.getBroadcastAddress());
     }
 
     @Deprecated
@@ -2336,13 +2336,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     }
     public int forceRepairAsync(final String keyspace, final boolean 
isSequential, final boolean isLocal, final boolean primaryRange, final 
String... columnFamilies)
     {
-        final Collection<Range<Token>> ranges = primaryRange ? 
getLocalPrimaryRanges() : getLocalRanges(keyspace);
+        final Collection<Range<Token>> ranges = primaryRange ? 
getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
         return forceRepairAsync(keyspace, isSequential, isLocal, ranges, 
columnFamilies);
     }
 
     public int forceRepairAsync(final String keyspace, final boolean 
isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, 
final String... columnFamilies)
     {
-        if (Table.SYSTEM_KS.equals(keyspace) || 
Tracing.TRACE_KS.equals(keyspace))
+        if (Table.SYSTEM_KS.equals(keyspace) || 
Tracing.TRACE_KS.equals(keyspace) || ranges.isEmpty())
             return 0;
 
         final int cmd = nextRepairCommand.incrementAndGet();
@@ -2377,7 +2377,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public void forceTableRepairPrimaryRange(final String tableName, boolean 
isSequential, boolean  isLocal, final String... columnFamilies) throws 
IOException
     {
-        forceTableRepairRange(tableName, getLocalPrimaryRanges(), 
isSequential, isLocal, columnFamilies);
+        forceTableRepairRange(tableName, getLocalPrimaryRanges(tableName), 
isSequential, isLocal, columnFamilies);
     }
 
     public void forceTableRepairRange(String beginToken, String endToken, 
final String tableName, boolean isSequential, boolean  isLocal, final String... 
columnFamilies) throws IOException
@@ -2505,17 +2505,36 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Get the primary ranges for the specified endpoint.
+     * Get the "primary ranges" for the specified keyspace and endpoint.
+     * "Primary ranges" are the ranges that the node is responsible for 
storing replica primarily.
+     * The node that stores replica primarily is defined as the first node 
returned
+     * by {@link AbstractReplicationStrategy#calculateNaturalEndpoints}.
+     *
+     * @param keyspace
      * @param ep endpoint we are interested in.
-     * @return collection of ranges for the specified endpoint.
+     * @return primary ranges for the specified endpoint.
      */
-    public Collection<Range<Token>> getPrimaryRangesForEndpoint(InetAddress ep)
+    public Collection<Range<Token>> getPrimaryRangesForEndpoint(String 
keyspace, InetAddress ep)
     {
-        return tokenMetadata.getPrimaryRangesFor(tokenMetadata.getTokens(ep));
+        AbstractReplicationStrategy strategy = 
Table.open(keyspace).getReplicationStrategy();
+        Collection<Range<Token>> primaryRanges = new HashSet<Range<Token>>();
+        TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
+        for (Token token : metadata.sortedTokens())
+        {
+            List<InetAddress> endpoints = 
strategy.calculateNaturalEndpoints(token, metadata);
+            if (endpoints.size() > 0 && endpoints.get(0).equals(ep))
+                primaryRanges.add(new 
Range<Token>(metadata.getPredecessor(token), token));
+        }
+        return primaryRanges;
     }
 
     /**
-     * Get the primary range for the specified endpoint.
+     * Previously, primary range is the range that the node is responsible for 
and calculated
+     * only from the token assigned to the node.
+     * But this does not take replication strategy into account, and therefore 
returns insufficient
+     * range especially using NTS with replication only to certain DC(see 
CASSANDRA-5424).
+     *
+     * @deprecated
      * @param ep endpoint we are interested in.
      * @return range for the specified endpoint.
      */
@@ -3834,8 +3853,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public List<String> sampleKeyRange() // do not rename to getter - see 
CASSANDRA-4452 for details
     {
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
-        for (Range<Token> range : getLocalPrimaryRanges())
-            keys.addAll(keySamples(ColumnFamilyStore.allUserDefined(), range));
+        for (Table keyspace : Table.nonSystem())
+        {
+            for (Range<Token> range : 
getPrimaryRangesForEndpoint(keyspace.name, FBUtilities.getBroadcastAddress()))
+                keys.addAll(keySamples(keyspace.getColumnFamilyStores(), 
range));
+        }
 
         List<String> sampledKeys = new ArrayList<String>(keys.size());
         for (DecoratedKey key : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java 
b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index ef7a2ab..7124ecb 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -102,7 +102,7 @@ public abstract class AntiEntropyServiceTestAbstract 
extends SchemaLoader
 
         Gossiper.instance.initializeNodeUnsafe(REMOTE, UUID.randomUUID(), 1);
 
-        local_range = StorageService.instance.getLocalPrimaryRange();
+        local_range = 
StorageService.instance.getPrimaryRangesForEndpoint(tablename, 
LOCAL).iterator().next();
 
         // (we use REMOTE instead of LOCAL so that the reponses for the 
validator.complete() get lost)
         request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, 
local_range, new CFPair(tablename, cfname));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/debd8f01/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java 
b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 39fbb4a..5ce9160 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -21,18 +21,28 @@ package org.apache.cassandra.service;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
+import java.net.InetAddress;
+import java.util.*;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.StringToken;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.PropertyFileSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -40,6 +50,13 @@ import static org.junit.Assert.assertTrue;
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class StorageServiceServerTest
 {
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        IEndpointSnitch snitch = new PropertyFileSnitch();
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+    }
+
     @Test
     public void testRegularMode() throws IOException, InterruptedException, 
ConfigurationException
     {
@@ -79,4 +96,170 @@ public class StorageServiceServerTest
         StorageService.instance.takeColumnFamilySnapshot(Table.SYSTEM_KS, 
"Schema", "cf_snapshot");
     }
 
+    @Test
+    public void testPrimaryRangesWithNetworkTopologyStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), 
InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), 
InetAddress.getByName("127.0.0.2"));
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), 
InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), 
InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put("DC1", "1");
+        configOptions.put("DC2", "1");
+
+        Table.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", 
"NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setTableDefinition(meta);
+
+        Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("D"), 
new StringToken("A")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.2"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("B"), 
new StringToken("C")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.4"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("A"), 
new StringToken("B")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.5"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("C"), 
new StringToken("D")));
+    }
+
+    @Test
+    public void testPrimaryRangesWithNetworkTopologyStrategyOneDCOnly() throws 
Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), 
InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), 
InetAddress.getByName("127.0.0.2"));
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), 
InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), 
InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put("DC2", "2");
+
+        Table.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", 
"NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setTableDefinition(meta);
+
+        // endpoints in DC1 should not have primary range
+        Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
+        assert primaryRanges.isEmpty();
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.2"));
+        assert primaryRanges.isEmpty();
+
+        // endpoints in DC2 should have primary ranges which also cover DC1
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.4"));
+        assert primaryRanges.size() == 2;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("D"), 
new StringToken("A")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("A"), 
new StringToken("B")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.5"));
+        assert primaryRanges.size() == 2;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("C"), 
new StringToken("D")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("B"), 
new StringToken("C")));
+    }
+
+    @Test
+    public void testPrimaryRangesWithVnodes() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+        // DC1
+        Multimap<InetAddress, Token> dc1 = HashMultimap.create();
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
+        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
+        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+        metadata.updateNormalTokens(dc1);
+        // DC2
+        Multimap<InetAddress, Token> dc2 = HashMultimap.create();
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
+        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
+        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+        metadata.updateNormalTokens(dc2);
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put("DC2", "2");
+
+        Table.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", 
"NetworkTopologyStrategy", configOptions, false);
+        Schema.instance.setTableDefinition(meta);
+
+        // endpoints in DC1 should not have primary range
+        Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
+        assert primaryRanges.isEmpty();
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.2"));
+        assert primaryRanges.isEmpty();
+
+        // endpoints in DC2 should have primary ranges which also cover DC1
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.4"));
+        assert primaryRanges.size() == 4;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("A"), 
new StringToken("B")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("F"), 
new StringToken("G")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("K"), 
new StringToken("L")));
+        // because /127.0.0.4 holds token "B" which is the next to token "A" 
from /127.0.0.1,
+        // the node covers range (L, A]
+        assert primaryRanges.contains(new Range<Token>(new StringToken("L"), 
new StringToken("A")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.5"));
+        assert primaryRanges.size() == 8;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("C"), 
new StringToken("D")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("E"), 
new StringToken("F")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("J"), 
new StringToken("K")));
+        // ranges from /127.0.0.1
+        assert primaryRanges.contains(new Range<Token>(new StringToken("D"), 
new StringToken("E")));
+        // the next token to "H" in DC2 is "K" in /127.0.0.5, so (G, H] goes 
to /127.0.0.5
+        assert primaryRanges.contains(new Range<Token>(new StringToken("G"), 
new StringToken("H")));
+        // ranges from /127.0.0.2
+        assert primaryRanges.contains(new Range<Token>(new StringToken("B"), 
new StringToken("C")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("H"), 
new StringToken("I")));
+        assert primaryRanges.contains(new Range<Token>(new StringToken("I"), 
new StringToken("J")));
+    }
+    @Test
+    public void testPrimaryRangesWithSimpleStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        metadata.updateNormalToken(new StringToken("A"), 
InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("B"), 
InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("C"), 
InetAddress.getByName("127.0.0.3"));
+
+        Map<String, String> configOptions = new HashMap<String, String>();
+        configOptions.put("replication_factor", "2");
+
+        Table.clear("Keyspace1");
+        KSMetaData meta = KSMetaData.newKeyspace("Keyspace1", 
"SimpleStrategy", configOptions, false);
+        Schema.instance.setTableDefinition(meta);
+
+        Collection<Range<Token>> primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.1"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("C"), 
new StringToken("A")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.2"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("A"), 
new StringToken("B")));
+
+        primaryRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(meta.name, 
InetAddress.getByName("127.0.0.3"));
+        assert primaryRanges.size() == 1;
+        assert primaryRanges.contains(new Range<Token>(new StringToken("B"), 
new StringToken("C")));
+    }
 }

Reply via email to