Repository: cassandra Updated Branches: refs/heads/trunk 0ad056432 -> 507e4a46a
Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas patch by Joseph Lynch; reviewed by Benedict Elliott Smith for CASSANDRA-14862 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/507e4a46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/507e4a46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/507e4a46 Branch: refs/heads/trunk Commit: 507e4a46a166cab5322a50fbe40c80cb0d16c290 Parents: 0ad0564 Author: Joseph Lynch <joe.e.ly...@gmail.com> Authored: Wed Oct 31 20:51:34 2018 -0700 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Wed Nov 7 18:43:30 2018 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + cafd44c8d9ae24c953a8d82746fc89bfe2465641.patch | 180 +++++++++++++++++++ .../locator/AbstractReplicationStrategy.java | 12 +- .../cassandra/locator/SimpleStrategy.java | 5 +- .../cassandra/locator/SimpleStrategyTest.java | 61 ++++++- 5 files changed, 247 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/507e4a46/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2373cb2..4081fce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862) * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358) * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841) * Avoid running query to self through messaging service (CASSANDRA-14807) http://git-wip-us.apache.org/repos/asf/cassandra/blob/507e4a46/cafd44c8d9ae24c953a8d82746fc89bfe2465641.patch ---------------------------------------------------------------------- diff --git a/cafd44c8d9ae24c953a8d82746fc89bfe2465641.patch b/cafd44c8d9ae24c953a8d82746fc89bfe2465641.patch new file mode 100644 index 0000000..0f7e423 --- /dev/null +++ b/cafd44c8d9ae24c953a8d82746fc89bfe2465641.patch @@ -0,0 +1,180 @@ +From cafd44c8d9ae24c953a8d82746fc89bfe2465641 Mon Sep 17 00:00:00 2001 +From: Joseph Lynch <joe.e.ly...@gmail.com> +Date: Wed, 31 Oct 2018 20:51:34 -0700 +Subject: [PATCH] Fixes incorrect sorting of replicas in + SimpleStrategy.calculateNaturalReplicas + +A small bug was introduced in e645b917 which would change SimpleStrategy +primary replica sets. This patch adds a regression test that emulates +the dtest that was broken by this change +(TestTopology.test_size_estimates_multidc) which fails before this patch +and succeeds after. +--- + .../locator/AbstractReplicationStrategy.java | 12 +++- + .../cassandra/locator/SimpleStrategy.java | 5 +- + .../cassandra/locator/SimpleStrategyTest.java | 61 +++++++++++++++++-- + 3 files changed, 66 insertions(+), 12 deletions(-) + +diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +index 818e20efe93..deb43c65478 100644 +--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ++++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +@@ -132,11 +132,19 @@ public Replica getLocalReplicaFor(RingPosition searchPosition) + } + + /** +- * calculate the natural endpoints for the given token ++ * Calculate the natural endpoints for the given token. Endpoints are returned in the order ++ * they occur in the ring following the searchToken, as defined by the replication strategy. ++ * ++ * Note that the order of the replicas is _implicitly relied upon_ by the definition of ++ * "primary" range in ++ * {@link org.apache.cassandra.service.StorageService#getPrimaryRangesForEndpoint(String, InetAddressAndPort)} ++ * which is in turn relied on by various components like repair and size estimate calculations. + * + * @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition) + * +- * @param searchToken the token the natural endpoints are requested for ++ * @param tokenMetadata the token metadata used to find the searchToken, e.g. contains token to endpoint ++ * mapping information ++ * @param searchToken the token to find the natural endpoints for + * @return a copy of the natural endpoints for the given token + */ + public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata); +diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java +index 2dd835c2612..748d2d3238a 100644 +--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java ++++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java +@@ -68,10 +68,7 @@ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata met + replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas)); + } + +- // group endpoints by DC, so that we can cheaply filter them to a given DC +- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); +- return replicas.build() +- .sorted(Comparator.comparing(r -> snitch.getDatacenter(r.endpoint()))); ++ return replicas.build(); + } + + public ReplicationFactor getReplicationFactor() +diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +index 338e752d407..507cc1fe5b7 100644 +--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java ++++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +@@ -27,7 +27,7 @@ + import com.google.common.collect.HashMultimap; + import com.google.common.collect.Lists; + import com.google.common.collect.Multimap; +-import org.junit.Assert; ++import org.junit.Before; + import org.junit.BeforeClass; + import org.junit.Test; + +@@ -56,15 +56,23 @@ + public class SimpleStrategyTest + { + public static final String KEYSPACE1 = "SimpleStrategyTest"; ++ public static final String MULTIDC = "MultiDCSimpleStrategyTest"; + + @BeforeClass +- public static void defineSchema() throws Exception ++ public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1)); ++ SchemaLoader.createKeyspace(MULTIDC, KeyspaceParams.simple(3)); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + } + ++ @Before ++ public void resetSnitch() ++ { ++ DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch()); ++ } ++ + @Test + public void tryValidKeyspace() + { +@@ -97,6 +105,47 @@ public void testStringEndpoints() throws UnknownHostException + verifyGetNaturalEndpoints(endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0])); + } + ++ @Test ++ public void testMultiDCSimpleStrategyEndpoints() throws UnknownHostException ++ { ++ IEndpointSnitch snitch = new PropertyFileSnitch(); ++ DatabaseDescriptor.setEndpointSnitch(snitch); ++ ++ TokenMetadata metadata = new TokenMetadata(); ++ ++ AbstractReplicationStrategy strategy = getStrategy(MULTIDC, metadata, snitch); ++ ++ // Topology taken directly from the topology_test.test_size_estimates_multidc dtest that regressed ++ Multimap<InetAddressAndPort, Token> dc1 = HashMultimap.create(); ++ dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new Murmur3Partitioner.LongToken(-6639341390736545756L)); ++ dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new Murmur3Partitioner.LongToken(-2688160409776496397L)); ++ dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new Murmur3Partitioner.LongToken(-2506475074448728501L)); ++ dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new Murmur3Partitioner.LongToken(8473270337963525440L)); ++ metadata.updateNormalTokens(dc1); ++ ++ Multimap<InetAddressAndPort, Token> dc2 = HashMultimap.create(); ++ dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new Murmur3Partitioner.LongToken(-3736333188524231709L)); ++ dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new Murmur3Partitioner.LongToken(8673615181726552074L)); ++ metadata.updateNormalTokens(dc2); ++ ++ Map<InetAddressAndPort, Integer> primaryCount = new HashMap<>(); ++ Map<InetAddressAndPort, Integer> replicaCount = new HashMap<>(); ++ for (Token t : metadata.sortedTokens()) ++ { ++ EndpointsForToken replicas = strategy.getNaturalReplicasForToken(t); ++ primaryCount.compute(replicas.get(0).endpoint(), (k, v) -> (v == null) ? 1 : v + 1); ++ for (Replica replica : replicas) ++ replicaCount.compute(replica.endpoint(), (k, v) -> (v == null) ? 1 : v + 1); ++ } ++ ++ // All three hosts should have 2 "primary" replica ranges and 6 total ranges with RF=3, 3 nodes and 2 DCs. ++ for (InetAddressAndPort addr : primaryCount.keySet()) ++ { ++ assertEquals(2, (int) primaryCount.get(addr)); ++ assertEquals(6, (int) replicaCount.get(addr)); ++ } ++ } ++ + // given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens, + // make sure that the Strategy picks the right endpoints for the keys. + private void verifyGetNaturalEndpoints(Token[] endpointTokens, Token[] keyTokens) throws UnknownHostException +@@ -106,7 +155,7 @@ private void verifyGetNaturalEndpoints(Token[] endpointTokens, Token[] keyTokens + for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) + { + tmd = new TokenMetadata(); +- strategy = getStrategy(keyspaceName, tmd); ++ strategy = getStrategy(keyspaceName, tmd, new SimpleSnitch()); + List<InetAddressAndPort> hosts = new ArrayList<>(); + for (int i = 0; i < endpointTokens.length; i++) + { +@@ -160,7 +209,7 @@ public void testGetEndpointsDuringBootstrap() throws UnknownHostException + AbstractReplicationStrategy strategy = null; + for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) + { +- strategy = getStrategy(keyspaceName, tmd); ++ strategy = getStrategy(keyspaceName, tmd, new SimpleSnitch()); + + PendingRangeCalculatorService.calculatePendingRanges(strategy, keyspaceName); + +@@ -238,14 +287,14 @@ public void transientReplica() throws Exception + strategy.getNaturalReplicasForToken(tk(101))); + } + +- private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd) ++ private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd, IEndpointSnitch snitch) + { + KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName); + return AbstractReplicationStrategy.createReplicationStrategy( + keyspaceName, + ksmd.params.replication.klass, + tmd, +- new SimpleSnitch(), ++ snitch, + ksmd.params.replication.options); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/507e4a46/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 818e20e..deb43c6 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -132,11 +132,19 @@ public abstract class AbstractReplicationStrategy } /** - * calculate the natural endpoints for the given token + * Calculate the natural endpoints for the given token. Endpoints are returned in the order + * they occur in the ring following the searchToken, as defined by the replication strategy. + * + * Note that the order of the replicas is _implicitly relied upon_ by the definition of + * "primary" range in + * {@link org.apache.cassandra.service.StorageService#getPrimaryRangesForEndpoint(String, InetAddressAndPort)} + * which is in turn relied on by various components like repair and size estimate calculations. * * @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition) * - * @param searchToken the token the natural endpoints are requested for + * @param tokenMetadata the token metadata used to find the searchToken, e.g. contains token to endpoint + * mapping information + * @param searchToken the token to find the natural endpoints for * @return a copy of the natural endpoints for the given token */ public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/507e4a46/src/java/org/apache/cassandra/locator/SimpleStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 2dd835c..748d2d3 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -68,10 +68,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas)); } - // group endpoints by DC, so that we can cheaply filter them to a given DC - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - return replicas.build() - .sorted(Comparator.comparing(r -> snitch.getDatacenter(r.endpoint()))); + return replicas.build(); } public ReplicationFactor getReplicationFactor() http://git-wip-us.apache.org/repos/asf/cassandra/blob/507e4a46/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java index 338e752..507cc1f 100644 --- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java @@ -27,7 +27,7 @@ import java.util.Map; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -56,15 +56,23 @@ import static org.junit.Assert.assertTrue; public class SimpleStrategyTest { public static final String KEYSPACE1 = "SimpleStrategyTest"; + public static final String MULTIDC = "MultiDCSimpleStrategyTest"; @BeforeClass - public static void defineSchema() throws Exception + public static void defineSchema() { SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1)); + SchemaLoader.createKeyspace(MULTIDC, KeyspaceParams.simple(3)); DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); } + @Before + public void resetSnitch() + { + DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch()); + } + @Test public void tryValidKeyspace() { @@ -97,6 +105,47 @@ public class SimpleStrategyTest verifyGetNaturalEndpoints(endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0])); } + @Test + public void testMultiDCSimpleStrategyEndpoints() throws UnknownHostException + { + IEndpointSnitch snitch = new PropertyFileSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); + + TokenMetadata metadata = new TokenMetadata(); + + AbstractReplicationStrategy strategy = getStrategy(MULTIDC, metadata, snitch); + + // Topology taken directly from the topology_test.test_size_estimates_multidc dtest that regressed + Multimap<InetAddressAndPort, Token> dc1 = HashMultimap.create(); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new Murmur3Partitioner.LongToken(-6639341390736545756L)); + dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new Murmur3Partitioner.LongToken(-2688160409776496397L)); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new Murmur3Partitioner.LongToken(-2506475074448728501L)); + dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new Murmur3Partitioner.LongToken(8473270337963525440L)); + metadata.updateNormalTokens(dc1); + + Multimap<InetAddressAndPort, Token> dc2 = HashMultimap.create(); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new Murmur3Partitioner.LongToken(-3736333188524231709L)); + dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new Murmur3Partitioner.LongToken(8673615181726552074L)); + metadata.updateNormalTokens(dc2); + + Map<InetAddressAndPort, Integer> primaryCount = new HashMap<>(); + Map<InetAddressAndPort, Integer> replicaCount = new HashMap<>(); + for (Token t : metadata.sortedTokens()) + { + EndpointsForToken replicas = strategy.getNaturalReplicasForToken(t); + primaryCount.compute(replicas.get(0).endpoint(), (k, v) -> (v == null) ? 1 : v + 1); + for (Replica replica : replicas) + replicaCount.compute(replica.endpoint(), (k, v) -> (v == null) ? 1 : v + 1); + } + + // All three hosts should have 2 "primary" replica ranges and 6 total ranges with RF=3, 3 nodes and 2 DCs. + for (InetAddressAndPort addr : primaryCount.keySet()) + { + assertEquals(2, (int) primaryCount.get(addr)); + assertEquals(6, (int) replicaCount.get(addr)); + } + } + // given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens, // make sure that the Strategy picks the right endpoints for the keys. private void verifyGetNaturalEndpoints(Token[] endpointTokens, Token[] keyTokens) throws UnknownHostException @@ -106,7 +155,7 @@ public class SimpleStrategyTest for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { tmd = new TokenMetadata(); - strategy = getStrategy(keyspaceName, tmd); + strategy = getStrategy(keyspaceName, tmd, new SimpleSnitch()); List<InetAddressAndPort> hosts = new ArrayList<>(); for (int i = 0; i < endpointTokens.length; i++) { @@ -160,7 +209,7 @@ public class SimpleStrategyTest AbstractReplicationStrategy strategy = null; for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - strategy = getStrategy(keyspaceName, tmd); + strategy = getStrategy(keyspaceName, tmd, new SimpleSnitch()); PendingRangeCalculatorService.calculatePendingRanges(strategy, keyspaceName); @@ -238,14 +287,14 @@ public class SimpleStrategyTest strategy.getNaturalReplicasForToken(tk(101))); } - private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd) + private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd, IEndpointSnitch snitch) { KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName); return AbstractReplicationStrategy.createReplicationStrategy( keyspaceName, ksmd.params.replication.klass, tmd, - new SimpleSnitch(), + snitch, ksmd.params.replication.options); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org