Repository: cassandra Updated Branches: refs/heads/trunk b453f0897 -> ae03e1bab
Fix race condition during pending range calculation Patch by Josh McKenzie; reviewed by Tyler Hobbs for CASSANDRA-7390 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae03e1ba Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae03e1ba Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae03e1ba Branch: refs/heads/trunk Commit: ae03e1bab709bcadcd973899be04e0c06a7df7a9 Parents: b453f08 Author: Josh McKenzie <[email protected]> Authored: Wed Jul 9 15:51:51 2014 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Wed Jul 9 15:51:51 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/locator/TokenMetadata.java | 129 +++++++++++++++- .../service/PendingRangeCalculatorService.java | 154 +++---------------- .../org/apache/cassandra/service/MoveTest.java | 15 +- 4 files changed, 159 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ff0a1c6..f2cd844 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) * Shorten SSTable path (CASSANDRA-6962) * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) 2.1.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index f41b1e7..c4e3542 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -752,9 +752,123 @@ public class TokenMetadata return ranges; } - public void setPendingRanges(String keyspaceName, Multimap<Range<Token>, InetAddress> rangeMap) + /** + * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: + * + * (1) When in doubt, it is better to write too much to a node than too little. That is, if + * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning + * up unneeded data afterwards is better than missing writes during movement. + * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional + * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore + * we will first remove _all_ leaving tokens for the sake of calculation and then check what + * ranges would go where if all nodes are to leave. This way we get the biggest possible + * ranges with regard current leave operations, covering all subsets of possible final range + * values. + * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing + * complex calculations to see if multiple bootstraps overlap, we simply base calculations + * on the same token ring used before (reflecting situation after all leave operations have + * completed). Bootstrapping nodes will be added and removed one by one to that metadata and + * checked what their ranges would be. This will give us the biggest possible ranges the + * node could have. It might be that other bootstraps make our actual final ranges smaller, + * but it does not matter as we can clean up the data afterwards. + * + * NOTE: This is heavy and ineffective operation. This will be done only once when a node + * changes state in the cluster, so it should be manageable. + */ + public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) { - pendingRanges.put(keyspaceName, rangeMap); + lock.readLock().lock(); + try + { + Multimap<Range<Token>, InetAddress> newPendingRanges = HashMultimap.create(); + + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty() && relocatingTokens.isEmpty()) + { + if (logger.isDebugEnabled()) + logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName); + + pendingRanges.put(keyspaceName, newPendingRanges); + return; + } + + Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); + + // Copy of metadata reflecting the situation after all leave operations are finished. + TokenMetadata allLeftMetadata = cloneAfterAllLeft(); + + // get all ranges that will be affected by leaving nodes + Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); + for (InetAddress endpoint : leavingEndpoints) + affectedRanges.addAll(addressRanges.get(endpoint)); + + // for each of those ranges, find what new nodes will be responsible for the range when + // all leaving nodes are gone. + for (Range<Token> range : affectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, cloneOnlyTokenMap())); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + newPendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); + } + + // At this stage newPendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + + // For each of the bootstrapping nodes, simply add and remove them one by one to + // allLeftMetadata and check in between what their ranges would be. + Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse(); + for (InetAddress endpoint : bootstrapAddresses.keySet()) + { + Collection<Token> tokens = bootstrapAddresses.get(endpoint); + + allLeftMetadata.updateNormalTokens(tokens, endpoint); + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + newPendingRanges.put(range, endpoint); + allLeftMetadata.removeEndpoint(endpoint); + } + + // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving and relocating nodes. + + // For each of the moving nodes, we do the same thing we did for bootstrapping: + // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. + for (Pair<Token, InetAddress> moving : movingEndpoints) + { + InetAddress endpoint = moving.right; // address of the moving node + + // moving.left is a new token of the endpoint + allLeftMetadata.updateNormalToken(moving.left, endpoint); + + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + newPendingRanges.put(range, endpoint); + } + + allLeftMetadata.removeEndpoint(endpoint); + } + + // Ranges being relocated. + for (Map.Entry<Token, InetAddress> relocating : relocatingTokens.entrySet()) + { + InetAddress endpoint = relocating.getValue(); // address of the moving node + Token token = relocating.getKey(); + + allLeftMetadata.updateNormalToken(token, endpoint); + + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + newPendingRanges.put(range, endpoint); + + allLeftMetadata.removeEndpoint(endpoint); + } + + pendingRanges.put(keyspaceName, newPendingRanges); + + if (logger.isDebugEnabled()) + logger.debug("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges())); + } + finally + { + lock.readLock().unlock(); + } } public Token getPredecessor(Token token) @@ -906,12 +1020,15 @@ public class TokenMetadata lock.writeLock().lock(); try { - bootstrapTokens.clear(); tokenToEndpointMap.clear(); - topology.clear(); + endpointToHostIdMap.clear(); + bootstrapTokens.clear(); leavingEndpoints.clear(); pendingRanges.clear(); - endpointToHostIdMap.clear(); + movingEndpoints.clear(); + relocatingTokens.clear(); + sortedTokens.clear(); + topology.clear(); invalidateCachedRings(); } finally @@ -978,7 +1095,7 @@ public class TokenMetadata return sb.toString(); } - public String printPendingRanges() + private String printPendingRanges() { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 74624d2..2276c4a 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -18,30 +18,16 @@ package org.apache.cassandra.service; -import org.apache.cassandra.utils.BiMultiValMap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.Collection; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; public class PendingRangeCalculatorService { @@ -51,9 +37,18 @@ public class PendingRangeCalculatorService private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal"); + private AtomicInteger updateJobs = new AtomicInteger(0); + public PendingRangeCalculatorService() { - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + executor.setRejectedExecutionHandler(new RejectedExecutionHandler() + { + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) + { + PendingRangeCalculatorService.instance.finishUpdate(); + } + } + ); } private static class PendingRangeTask implements Runnable @@ -65,21 +60,27 @@ public class PendingRangeCalculatorService { calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName); } + PendingRangeCalculatorService.instance.finishUpdate(); logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start); } } - public Future<?> update() + private void finishUpdate() { - return executor.submit(new PendingRangeTask()); + updateJobs.decrementAndGet(); + } + + public void update() + { + updateJobs.incrementAndGet(); + executor.submit(new PendingRangeTask()); } public void blockUntilFinished() { - while (true) + // We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count + while (updateJobs.get() > 0) { - if (executor.getActiveCount() + executor.getPendingTasks() == 0) - break; try { Thread.sleep(100); @@ -91,117 +92,10 @@ public class PendingRangeCalculatorService } } - /** - * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: - * - * (1) When in doubt, it is better to write too much to a node than too little. That is, if - * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning - * up unneeded data afterwards is better than missing writes during movement. - * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional - * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore - * we will first remove _all_ leaving tokens for the sake of calculation and then check what - * ranges would go where if all nodes are to leave. This way we get the biggest possible - * ranges with regard current leave operations, covering all subsets of possible final range - * values. - * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing - * complex calculations to see if multiple bootstraps overlap, we simply base calculations - * on the same token ring used before (reflecting situation after all leave operations have - * completed). Bootstrapping nodes will be added and removed one by one to that metadata and - * checked what their ranges would be. This will give us the biggest possible ranges the - * node could have. It might be that other bootstraps make our actual final ranges smaller, - * but it does not matter as we can clean up the data afterwards. - * - * NOTE: This is heavy and ineffective operation. This will be done only once when a node - * changes state in the cluster, so it should be manageable. - */ + // public & static for testing purposes public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) { - TokenMetadata tm = StorageService.instance.getTokenMetadata(); - Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); - BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); - Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); - - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty()) - { - if (logger.isDebugEnabled()) - logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName); - tm.setPendingRanges(keyspaceName, pendingRanges); - return; - } - - Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); - - // get all ranges that will be affected by leaving nodes - Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); - for (InetAddress endpoint : leavingEndpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); - - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - for (Range<Token> range : affectedRanges) - { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap())); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); - } - - // At this stage pendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. - Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse(); - for (InetAddress endpoint : bootstrapAddresses.keySet()) - { - Collection<Token> tokens = bootstrapAddresses.get(endpoint); - - allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - allLeftMetadata.removeEndpoint(endpoint); - } - - // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving and relocating nodes. - - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) - { - InetAddress endpoint = moving.right; // address of the moving node - - // moving.left is a new token of the endpoint - allLeftMetadata.updateNormalToken(moving.left, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - pendingRanges.put(range, endpoint); - } - - allLeftMetadata.removeEndpoint(endpoint); - } - - // Ranges being relocated. - for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet()) - { - InetAddress endpoint = relocating.getValue(); // address of the moving node - Token token = relocating.getKey(); - - allLeftMetadata.updateNormalToken(token, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - - allLeftMetadata.removeEndpoint(endpoint); - } - - tm.setPendingRanges(keyspaceName, pendingRanges); - - if (logger.isDebugEnabled()) - logger.debug("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges())); + StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index c01f4af..1ee71dd 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.*; import org.apache.cassandra.gms.Gossiper; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -73,6 +74,13 @@ public class MoveTest StorageService.instance.setPartitionerUnsafe(oldPartitioner); } + @Before + public void clearTokenMetadata() + { + PendingRangeCalculatorService.instance.blockUntilFinished(); + StorageService.instance.getTokenMetadata().clearUnsafe(); + } + /* * Test whether write endpoints is correct when the node is moving. Uses * StorageService.onChange and does not manipulate token metadata directly. @@ -85,7 +93,6 @@ public class MoveTest final int MOVING_NODE = 3; // index of the moving node TokenMetadata tmd = ss.getTokenMetadata(); - tmd.clearUnsafe(); VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); ArrayList<Token> endpointTokens = new ArrayList<Token>(); @@ -141,7 +148,7 @@ public class MoveTest numMoved++; } } - assertEquals("mismatched number of moved token", numMoved, 1); + assertEquals("mismatched number of moved token", 1, numMoved); } // moving endpoint back to the normal state @@ -157,7 +164,6 @@ public class MoveTest StorageService ss = StorageService.instance; final int RING_SIZE = 10; TokenMetadata tmd = ss.getTokenMetadata(); - tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); @@ -195,6 +201,8 @@ public class MoveTest ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)))); + PendingRangeCalculatorService.instance.blockUntilFinished(); + InetAddress boot2 = InetAddress.getByName("127.0.1.2"); Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1); Gossiper.instance.injectApplicationState(boot2, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(7)))); @@ -498,7 +506,6 @@ public class MoveTest { StorageService ss = StorageService.instance; TokenMetadata tmd = ss.getTokenMetadata(); - tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
