Make batchlog replica selection rack-aware patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6551
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af96d405 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af96d405 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af96d405 Branch: refs/heads/cassandra-2.1 Commit: af96d405b42a9e4ae23cba841b7a5d83ee8f7ec8 Parents: fab4557 Author: Jonathan Ellis <[email protected]> Authored: Fri May 2 22:47:27 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri May 2 22:47:27 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../service/BatchlogEndpointSelector.java | 110 ++++++++++++++++++ .../apache/cassandra/service/StorageProxy.java | 21 +--- .../service/BatchlogEndpointSelectorTest.java | 116 +++++++++++++++++++ 4 files changed, 232 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9ab1a5f..5799659 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 2.0.8 -======= + * Make batchlog replica selection rack-aware (CASSANDRA-6551) * Add Google Compute Engine snitch (CASSANDRA-7132) * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072) * Set JMX RMI port to 7199 (CASSANDRA-7087) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java new file mode 100644 index 0000000..bf032f5 --- /dev/null +++ b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.utils.FBUtilities; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +public class BatchlogEndpointSelector +{ + private final String localRack; + + public BatchlogEndpointSelector(String localRack) + { + this.localRack = localRack; + } + + /** + * @param endpoints nodes in the local datacenter, grouped by rack name + * @return list of candidates for batchlog hosting. if possible these will be two nodes from different racks. + */ + public Collection<InetAddress> chooseEndpoints(Multimap<String, InetAddress> endpoints) + { + // strip out dead endpoints and localhost + ListMultimap<String, InetAddress> validated = ArrayListMultimap.create(); + for (Map.Entry<String, InetAddress> entry : endpoints.entries()) + { + if (isValid(entry.getValue())) + validated.put(entry.getKey(), entry.getValue()); + } + if (validated.size() <= 2) + return validated.values(); + + if ((validated.size() - validated.get(localRack).size()) >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + // we have only 1 `other` rack + Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values()); + return Lists.newArrayList(Iterables.limit(otherRack, 2)); + } + + // randomize which racks we pick from if more than 2 remaining + Collection<String> racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + Collections.shuffle((List) racks); + } + + // grab a random member of up to two racks + List<InetAddress> result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List<InetAddress> rackMembers = validated.get(rack); + result.add(rackMembers.get(getRandomInt(rackMembers.size()))); + } + + return result; + } + + @VisibleForTesting + protected boolean isValid(InetAddress input) + { + return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); + } + + @VisibleForTesting + protected int getRandomInt(int bound) + { + return FBUtilities.threadLocalRandom().nextInt(bound); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 14d5ee2..2bf8e7f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -729,24 +728,14 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException { TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - List<InetAddress> localEndpoints = new ArrayList<>(topology.getDatacenterEndpoints().get(localDataCenter)); - + Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); + // special case for single-node datacenters if (localEndpoints.size() == 1) - return localEndpoints; - - List<InetAddress> chosenEndpoints = new ArrayList<>(2); - int startOffset = new Random().nextInt(localEndpoints.size()); + return localEndpoints.values(); - // starts at some random point in the list, advances forward until the end, then loops - // around to the beginning, advancing again until it is back at the starting point again. - for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++) - { - InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size()); - // skip localhost and non-alive nodes - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint)) - chosenEndpoints.add(endpoint); - } + String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()); + Collection<InetAddress> chosenEndpoints = new BatchlogEndpointSelector(localRack).chooseEndpoints(localEndpoints); if (chosenEndpoints.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/af96d405/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java new file mode 100644 index 0000000..293078d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; + +import org.junit.Test; +import org.junit.matchers.JUnitMatchers; + +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; + +public class BatchlogEndpointSelectorTest +{ + private final BatchlogEndpointSelector target; + private static final String LOCAL = "local"; + + + public BatchlogEndpointSelectorTest() throws UnknownHostException + { + target = new BatchlogEndpointSelector(LOCAL) + { + @Override + protected boolean isValid(InetAddress input) + { + //we will use always alive non-localhost endpoints + return true; + } + + @Override + protected int getRandomInt(int bound) + { + //we don't need a random behavior here + return bound - 1; + } + }; + } + + @Test + public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .put("1", InetAddress.getByName("11")) + .put("2", InetAddress.getByName("2")) + .put("2", InetAddress.getByName("22")) + .build(); + Collection<InetAddress> result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22"))); + } + + @Test + public void shouldSelectHostFromLocal() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .build(); + Collection<InetAddress> result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + } + + @Test + public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .build(); + Collection<InetAddress> result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(1)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + } + + @Test + public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .put("1", InetAddress.getByName("11")) + .put("1", InetAddress.getByName("111")) + .build(); + Collection<InetAddress> result = target.chooseEndpoints(endpoints); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); + } +}
