Repository: cassandra
Updated Branches:
  refs/heads/trunk b453f0897 -> ae03e1bab


Fix race condition during pending range calculation

Patch by Josh McKenzie; reviewed by Tyler Hobbs for CASSANDRA-7390


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae03e1ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae03e1ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae03e1ba

Branch: refs/heads/trunk
Commit: ae03e1bab709bcadcd973899be04e0c06a7df7a9
Parents: b453f08
Author: Josh McKenzie <[email protected]>
Authored: Wed Jul 9 15:51:51 2014 -0500
Committer: Tyler Hobbs <[email protected]>
Committed: Wed Jul 9 15:51:51 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 129 +++++++++++++++-
 .../service/PendingRangeCalculatorService.java  | 154 +++----------------
 .../org/apache/cassandra/service/MoveTest.java  |  15 +-
 4 files changed, 159 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff0a1c6..f2cd844 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
  * Shorten SSTable path (CASSANDRA-6962)
  * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 
 
 2.1.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index f41b1e7..c4e3542 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -752,9 +752,123 @@ public class TokenMetadata
         return ranges;
     }
 
-    public void setPendingRanges(String keyspaceName, Multimap<Range<Token>, 
InetAddress> rangeMap)
+     /**
+     * Calculate pending ranges according to bootsrapping and leaving nodes. 
Reasoning is:
+     *
+     * (1) When in doubt, it is better to write too much to a node than too 
little. That is, if
+     * there are multiple nodes moving, calculate the biggest ranges a node 
could have. Cleaning
+     * up unneeded data afterwards is better than missing writes during 
movement.
+     * (2) When a node leaves, ranges for other nodes can only grow (a node 
might get additional
+     * ranges, but it will not lose any of its current ranges as a result of a 
leave). Therefore
+     * we will first remove _all_ leaving tokens for the sake of calculation 
and then check what
+     * ranges would go where if all nodes are to leave. This way we get the 
biggest possible
+     * ranges with regard current leave operations, covering all subsets of 
possible final range
+     * values.
+     * (3) When a node bootstraps, ranges of other nodes can only get smaller. 
Without doing
+     * complex calculations to see if multiple bootstraps overlap, we simply 
base calculations
+     * on the same token ring used before (reflecting situation after all 
leave operations have
+     * completed). Bootstrapping nodes will be added and removed one by one to 
that metadata and
+     * checked what their ranges would be. This will give us the biggest 
possible ranges the
+     * node could have. It might be that other bootstraps make our actual 
final ranges smaller,
+     * but it does not matter as we can clean up the data afterwards.
+     *
+     * NOTE: This is heavy and ineffective operation. This will be done only 
once when a node
+     * changes state in the cluster, so it should be manageable.
+     */
+    public void calculatePendingRanges(AbstractReplicationStrategy strategy, 
String keyspaceName)
     {
-        pendingRanges.put(keyspaceName, rangeMap);
+        lock.readLock().lock();
+        try
+        {
+            Multimap<Range<Token>, InetAddress> newPendingRanges = 
HashMultimap.create();
+
+            if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty() && relocatingTokens.isEmpty())
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("No bootstrapping, leaving or moving nodes, 
and no relocating tokens -> empty pending ranges for {}", keyspaceName);
+
+                pendingRanges.put(keyspaceName, newPendingRanges);
+                return;
+            }
+
+            Multimap<InetAddress, Range<Token>> addressRanges = 
strategy.getAddressRanges();
+
+            // Copy of metadata reflecting the situation after all leave 
operations are finished.
+            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+
+            // get all ranges that will be affected by leaving nodes
+            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+            for (InetAddress endpoint : leavingEndpoints)
+                affectedRanges.addAll(addressRanges.get(endpoint));
+
+            // for each of those ranges, find what new nodes will be 
responsible for the range when
+            // all leaving nodes are gone.
+            for (Range<Token> range : affectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
cloneOnlyTokenMap()));
+                Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
+                newPendingRanges.putAll(range, Sets.difference(newEndpoints, 
currentEndpoints));
+            }
+
+            // At this stage newPendingRanges has been updated according to 
leave operations. We can
+            // now continue the calculation by checking bootstrapping nodes.
+
+            // For each of the bootstrapping nodes, simply add and remove them 
one by one to
+            // allLeftMetadata and check in between what their ranges would be.
+            Multimap<InetAddress, Token> bootstrapAddresses = 
bootstrapTokens.inverse();
+            for (InetAddress endpoint : bootstrapAddresses.keySet())
+            {
+                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+                allLeftMetadata.updateNormalTokens(tokens, endpoint);
+                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+                    newPendingRanges.put(range, endpoint);
+                allLeftMetadata.removeEndpoint(endpoint);
+            }
+
+            // At this stage newPendingRanges has been updated according to 
leaving and bootstrapping nodes.
+            // We can now finish the calculation by checking moving and 
relocating nodes.
+
+            // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
+            // simply add and remove them one by one to allLeftMetadata and 
check in between what their ranges would be.
+            for (Pair<Token, InetAddress> moving : movingEndpoints)
+            {
+                InetAddress endpoint = moving.right; // address of the moving 
node
+
+                //  moving.left is a new token of the endpoint
+                allLeftMetadata.updateNormalToken(moving.left, endpoint);
+
+                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+                {
+                    newPendingRanges.put(range, endpoint);
+                }
+
+                allLeftMetadata.removeEndpoint(endpoint);
+            }
+
+            // Ranges being relocated.
+            for (Map.Entry<Token, InetAddress> relocating : 
relocatingTokens.entrySet())
+            {
+                InetAddress endpoint = relocating.getValue(); // address of 
the moving node
+                Token token = relocating.getKey();
+
+                allLeftMetadata.updateNormalToken(token, endpoint);
+
+                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+                    newPendingRanges.put(range, endpoint);
+
+                allLeftMetadata.removeEndpoint(endpoint);
+            }
+
+            pendingRanges.put(keyspaceName, newPendingRanges);
+
+            if (logger.isDebugEnabled())
+                logger.debug("Pending ranges:\n{}", (pendingRanges.isEmpty() ? 
"<empty>" : printPendingRanges()));
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
     }
 
     public Token getPredecessor(Token token)
@@ -906,12 +1020,15 @@ public class TokenMetadata
         lock.writeLock().lock();
         try
         {
-            bootstrapTokens.clear();
             tokenToEndpointMap.clear();
-            topology.clear();
+            endpointToHostIdMap.clear();
+            bootstrapTokens.clear();
             leavingEndpoints.clear();
             pendingRanges.clear();
-            endpointToHostIdMap.clear();
+            movingEndpoints.clear();
+            relocatingTokens.clear();
+            sortedTokens.clear();
+            topology.clear();
             invalidateCachedRings();
         }
         finally
@@ -978,7 +1095,7 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public String printPendingRanges()
+    private String printPendingRanges()
     {
         StringBuilder sb = new StringBuilder();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 74624d2..2276c4a 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -18,30 +18,16 @@
 
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.utils.BiMultiValMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.Collection;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class PendingRangeCalculatorService
 {
@@ -51,9 +37,18 @@ public class PendingRangeCalculatorService
     private final JMXEnabledThreadPoolExecutor executor = new 
JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new 
NamedThreadFactory("PendingRangeCalculator"), "internal");
 
+    private AtomicInteger updateJobs = new AtomicInteger(0);
+
     public PendingRangeCalculatorService()
     {
-        executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardPolicy());
+        executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
+        {
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
+            {
+                PendingRangeCalculatorService.instance.finishUpdate();
+            }
+        }
+        );
     }
 
     private static class PendingRangeTask implements Runnable
@@ -65,21 +60,27 @@ public class PendingRangeCalculatorService
             {
                 
calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), 
keyspaceName);
             }
+            PendingRangeCalculatorService.instance.finishUpdate();
             logger.debug("finished calculation for {} keyspaces in {}ms", 
Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - 
start);
         }
     }
 
-    public Future<?> update()
+    private void finishUpdate()
     {
-        return executor.submit(new PendingRangeTask());
+        updateJobs.decrementAndGet();
+    }
+
+    public void update()
+    {
+        updateJobs.incrementAndGet();
+        executor.submit(new PendingRangeTask());
     }
 
     public void blockUntilFinished()
     {
-        while (true)
+        // We want to be sure the job we're blocking for is actually finished 
and we can't trust the TPE's active job count
+        while (updateJobs.get() > 0)
         {
-            if (executor.getActiveCount() + executor.getPendingTasks() == 0)
-                break;
             try
             {
                 Thread.sleep(100);
@@ -91,117 +92,10 @@ public class PendingRangeCalculatorService
         }
     }
 
-    /**
-     * Calculate pending ranges according to bootsrapping and leaving nodes. 
Reasoning is:
-     *
-     * (1) When in doubt, it is better to write too much to a node than too 
little. That is, if
-     * there are multiple nodes moving, calculate the biggest ranges a node 
could have. Cleaning
-     * up unneeded data afterwards is better than missing writes during 
movement.
-     * (2) When a node leaves, ranges for other nodes can only grow (a node 
might get additional
-     * ranges, but it will not lose any of its current ranges as a result of a 
leave). Therefore
-     * we will first remove _all_ leaving tokens for the sake of calculation 
and then check what
-     * ranges would go where if all nodes are to leave. This way we get the 
biggest possible
-     * ranges with regard current leave operations, covering all subsets of 
possible final range
-     * values.
-     * (3) When a node bootstraps, ranges of other nodes can only get smaller. 
Without doing
-     * complex calculations to see if multiple bootstraps overlap, we simply 
base calculations
-     * on the same token ring used before (reflecting situation after all 
leave operations have
-     * completed). Bootstrapping nodes will be added and removed one by one to 
that metadata and
-     * checked what their ranges would be. This will give us the biggest 
possible ranges the
-     * node could have. It might be that other bootstraps make our actual 
final ranges smaller,
-     * but it does not matter as we can clean up the data afterwards.
-     *
-     * NOTE: This is heavy and ineffective operation. This will be done only 
once when a node
-     * changes state in the cluster, so it should be manageable.
-     */
+
     // public & static for testing purposes
     public static void calculatePendingRanges(AbstractReplicationStrategy 
strategy, String keyspaceName)
     {
-        TokenMetadata tm = StorageService.instance.getTokenMetadata();
-        Multimap<Range<Token>, InetAddress> pendingRanges = 
HashMultimap.create();
-        BiMultiValMap<Token, InetAddress> bootstrapTokens = 
tm.getBootstrapTokens();
-        Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
-
-        if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty())
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("No bootstrapping, leaving or moving nodes, and 
no relocating tokens -> empty pending ranges for {}", keyspaceName);
-            tm.setPendingRanges(keyspaceName, pendingRanges);
-            return;
-        }
-
-        Multimap<InetAddress, Range<Token>> addressRanges = 
strategy.getAddressRanges();
-
-        // Copy of metadata reflecting the situation after all leave 
operations are finished.
-        TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
-
-        // get all ranges that will be affected by leaving nodes
-        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-        for (InetAddress endpoint : leavingEndpoints)
-            affectedRanges.addAll(addressRanges.get(endpoint));
-
-        // for each of those ranges, find what new nodes will be responsible 
for the range when
-        // all leaving nodes are gone.
-        for (Range<Token> range : affectedRanges)
-        {
-            Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
tm.cloneOnlyTokenMap()));
-            Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
-            pendingRanges.putAll(range, Sets.difference(newEndpoints, 
currentEndpoints));
-        }
-
-        // At this stage pendingRanges has been updated according to leave 
operations. We can
-        // now continue the calculation by checking bootstrapping nodes.
-
-        // For each of the bootstrapping nodes, simply add and remove them one 
by one to
-        // allLeftMetadata and check in between what their ranges would be.
-        Multimap<InetAddress, Token> bootstrapAddresses = 
bootstrapTokens.inverse();
-        for (InetAddress endpoint : bootstrapAddresses.keySet())
-        {
-            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
-
-            allLeftMetadata.updateNormalTokens(tokens, endpoint);
-            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                pendingRanges.put(range, endpoint);
-            allLeftMetadata.removeEndpoint(endpoint);
-        }
-
-        // At this stage pendingRanges has been updated according to leaving 
and bootstrapping nodes.
-        // We can now finish the calculation by checking moving and relocating 
nodes.
-
-        // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
-        // simply add and remove them one by one to allLeftMetadata and check 
in between what their ranges would be.
-        for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
-        {
-            InetAddress endpoint = moving.right; // address of the moving node
-
-            //  moving.left is a new token of the endpoint
-            allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
-            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-            {
-                pendingRanges.put(range, endpoint);
-            }
-
-            allLeftMetadata.removeEndpoint(endpoint);
-        }
-
-        // Ranges being relocated.
-        for (Map.Entry<Token, InetAddress> relocating : 
tm.getRelocatingRanges().entrySet())
-        {
-            InetAddress endpoint = relocating.getValue(); // address of the 
moving node
-            Token token = relocating.getKey();
-
-            allLeftMetadata.updateNormalToken(token, endpoint);
-
-            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                pendingRanges.put(range, endpoint);
-
-            allLeftMetadata.removeEndpoint(endpoint);
-        }
-
-        tm.setPendingRanges(keyspaceName, pendingRanges);
-
-        if (logger.isDebugEnabled())
-            logger.debug("Pending ranges:\n{}", (pendingRanges.isEmpty() ? 
"<empty>" : tm.printPendingRanges()));
+        
StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, 
keyspaceName);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae03e1ba/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index c01f4af..1ee71dd 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.*;
 
 import org.apache.cassandra.gms.Gossiper;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -73,6 +74,13 @@ public class MoveTest
         StorageService.instance.setPartitionerUnsafe(oldPartitioner);
     }
 
+    @Before
+    public void clearTokenMetadata()
+    {
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+    }
+
     /*
      * Test whether write endpoints is correct when the node is moving. Uses
      * StorageService.onChange and does not manipulate token metadata directly.
@@ -85,7 +93,6 @@ public class MoveTest
         final int MOVING_NODE = 3; // index of the moving node
 
         TokenMetadata tmd = ss.getTokenMetadata();
-        tmd.clearUnsafe();
         VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
 
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
@@ -141,7 +148,7 @@ public class MoveTest
                        numMoved++;
                 }
             }
-            assertEquals("mismatched number of moved token", numMoved, 1);
+            assertEquals("mismatched number of moved token", 1, numMoved);
         }
 
         // moving endpoint back to the normal state
@@ -157,7 +164,6 @@ public class MoveTest
         StorageService ss = StorageService.instance;
         final int RING_SIZE = 10;
         TokenMetadata tmd = ss.getTokenMetadata();
-        tmd.clearUnsafe();
         IPartitioner partitioner = new RandomPartitioner();
         VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
 
@@ -195,6 +201,8 @@ public class MoveTest
         ss.onChange(boot1,
                     ApplicationState.STATUS,
                     
valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5))));
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+
         InetAddress boot2 = InetAddress.getByName("127.0.1.2");
         Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1);
         Gossiper.instance.injectApplicationState(boot2, 
ApplicationState.TOKENS, 
valueFactory.tokens(Collections.singleton(keyTokens.get(7))));
@@ -498,7 +506,6 @@ public class MoveTest
     {
         StorageService ss = StorageService.instance;
         TokenMetadata tmd = ss.getTokenMetadata();
-        tmd.clearUnsafe();
         IPartitioner partitioner = new RandomPartitioner();
         VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
 

Reply via email to