Repository: cassandra
Updated Branches:
  refs/heads/trunk fafcfc787 -> da3fd5d7a


fix merge


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3fd5d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3fd5d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3fd5d7

Branch: refs/heads/trunk
Commit: da3fd5d7ac2bb0f64a32176c3722b857b71bc654
Parents: fafcfc7
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Wed Jul 30 22:07:10 2014 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Wed Jul 30 22:08:40 2014 -0500

----------------------------------------------------------------------
 .../apache/cassandra/locator/TokenMetadata.java |  21 +-
 .../service/PendingRangeCalculatorService.java  |  73 +------
 .../ScheduledRangeTransferExecutorService.java  | 135 ------------
 .../apache/cassandra/service/RelocateTest.java  | 204 -------------------
 4 files changed, 4 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index bb5455c..2a6a624 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -701,10 +701,10 @@ public class TokenMetadata
         {
             Multimap<Range<Token>, InetAddress> newPendingRanges = 
HashMultimap.create();
 
-            if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty() && relocatingTokens.isEmpty())
+            if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty())
             {
                 if (logger.isDebugEnabled())
-                    logger.debug("No bootstrapping, leaving or moving nodes, 
and no relocating tokens -> empty pending ranges for {}", keyspaceName);
+                    logger.debug("No bootstrapping, leaving or moving nodes -> 
empty pending ranges for {}", keyspaceName);
 
                 pendingRanges.put(keyspaceName, newPendingRanges);
                 return;
@@ -746,7 +746,7 @@ public class TokenMetadata
             }
 
             // At this stage newPendingRanges has been updated according to 
leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving and 
relocating nodes.
+            // We can now finish the calculation by checking moving nodes.
 
             // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
             // simply add and remove them one by one to allLeftMetadata and 
check in between what their ranges would be.
@@ -765,20 +765,6 @@ public class TokenMetadata
                 allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            // Ranges being relocated.
-            for (Map.Entry<Token, InetAddress> relocating : 
relocatingTokens.entrySet())
-            {
-                InetAddress endpoint = relocating.getValue(); // address of 
the moving node
-                Token token = relocating.getKey();
-
-                allLeftMetadata.updateNormalToken(token, endpoint);
-
-                for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                    newPendingRanges.put(range, endpoint);
-
-                allLeftMetadata.removeEndpoint(endpoint);
-            }
-
             pendingRanges.put(keyspaceName, newPendingRanges);
 
             if (logger.isDebugEnabled())
@@ -936,7 +922,6 @@ public class TokenMetadata
             leavingEndpoints.clear();
             pendingRanges.clear();
             movingEndpoints.clear();
-            relocatingTokens.clear();
             sortedTokens.clear();
             topology.clear();
             invalidateCachedRings();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index d3aa6b6..2276c4a 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -96,77 +96,6 @@ public class PendingRangeCalculatorService
     // public & static for testing purposes
     public static void calculatePendingRanges(AbstractReplicationStrategy 
strategy, String keyspaceName)
     {
-        TokenMetadata tm = StorageService.instance.getTokenMetadata();
-        Multimap<Range<Token>, InetAddress> pendingRanges = 
HashMultimap.create();
-        BiMultiValMap<Token, InetAddress> bootstrapTokens = 
tm.getBootstrapTokens();
-        Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
-
-        if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
tm.getMovingEndpoints().isEmpty())
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("No bootstrapping, leaving or moving nodes -> 
empty pending ranges for {}", keyspaceName);
-            tm.setPendingRanges(keyspaceName, pendingRanges);
-            return;
-        }
-
-        Multimap<InetAddress, Range<Token>> addressRanges = 
strategy.getAddressRanges();
-
-        // Copy of metadata reflecting the situation after all leave 
operations are finished.
-        TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
-
-        // get all ranges that will be affected by leaving nodes
-        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-        for (InetAddress endpoint : leavingEndpoints)
-            affectedRanges.addAll(addressRanges.get(endpoint));
-
-        // for each of those ranges, find what new nodes will be responsible 
for the range when
-        // all leaving nodes are gone.
-        for (Range<Token> range : affectedRanges)
-        {
-            Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
tm.cloneOnlyTokenMap()));
-            Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
-            pendingRanges.putAll(range, Sets.difference(newEndpoints, 
currentEndpoints));
-        }
-
-        // At this stage pendingRanges has been updated according to leave 
operations. We can
-        // now continue the calculation by checking bootstrapping nodes.
-
-        // For each of the bootstrapping nodes, simply add and remove them one 
by one to
-        // allLeftMetadata and check in between what their ranges would be.
-        Multimap<InetAddress, Token> bootstrapAddresses = 
bootstrapTokens.inverse();
-        for (InetAddress endpoint : bootstrapAddresses.keySet())
-        {
-            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
-
-            allLeftMetadata.updateNormalTokens(tokens, endpoint);
-            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                pendingRanges.put(range, endpoint);
-            allLeftMetadata.removeEndpoint(endpoint);
-        }
-
-        // At this stage pendingRanges has been updated according to leaving 
and bootstrapping nodes.
-        // We can now finish the calculation by checking moving and relocating 
nodes.
-
-        // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
-        // simply add and remove them one by one to allLeftMetadata and check 
in between what their ranges would be.
-        for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
-        {
-            InetAddress endpoint = moving.right; // address of the moving node
-
-            //  moving.left is a new token of the endpoint
-            allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
-            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-            {
-                pendingRanges.put(range, endpoint);
-            }
-
-            allLeftMetadata.removeEndpoint(endpoint);
-        }
-
-        tm.setPendingRanges(keyspaceName, pendingRanges);
-
-        if (logger.isDebugEnabled())
-            logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? 
"<empty>" : tm.printPendingRanges()));
+        
StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, 
keyspaceName);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
 
b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
deleted file mode 100644
index b8117b9..0000000
--- 
a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Date;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ScheduledRangeTransferExecutorService
-{
-    private static final Logger LOG = 
LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class);
-    private static final int INTERVAL = 10;
-    private ScheduledExecutorService scheduler;
-
-    public void setup()
-    {
-        if (DatabaseDescriptor.getNumTokens() == 1)
-        {
-            LOG.warn("Cannot start range transfer scheduler: endpoint is not 
virtual nodes-enabled");
-            return;
-        }
-
-        scheduler = Executors.newSingleThreadScheduledExecutor(new 
RangeTransferThreadFactory());
-        scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, 
TimeUnit.SECONDS);
-        LOG.info("Enabling scheduled transfers of token ranges");
-    }
-
-    public void tearDown()
-    {
-        if (scheduler == null)
-        {
-            LOG.warn("Unable to shutdown; Scheduler never enabled");
-            return;
-        }
-
-        LOG.info("Shutting down range transfer scheduler");
-        scheduler.shutdownNow();
-    }
-}
-
-class RangeTransfer implements Runnable
-{
-    private static final Logger LOG = 
LoggerFactory.getLogger(RangeTransfer.class);
-
-    public void run()
-    {
-        UntypedResultSet res = executeInternal("SELECT * FROM system." + 
SystemKeyspace.RANGE_XFERS_CF);
-
-        if (res.size() < 1)
-        {
-            LOG.info("No queued ranges to transfer, shuffle complete.  Run 
'cassandra-shuffle disable' to stop this message.");
-            return;
-        }
-
-        if (!isReady())
-            return;
-
-        UntypedResultSet.Row row = res.iterator().next();
-
-        Date requestedAt = row.getTimestamp("requested_at");
-        ByteBuffer tokenBytes = row.getBytes("token_bytes");
-        Token token = 
StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes);
-
-        LOG.info("Initiating transfer of {} (scheduled at {})", token, 
requestedAt.toString());
-        try
-        {
-            
StorageService.instance.relocateTokens(Collections.singleton(token));
-        }
-        catch (Exception e)
-        {
-            LOG.error("Error removing {}: {}", token, e);
-        }
-        finally
-        {
-            LOG.debug("Removing queued entry for transfer of {}", token);
-            executeInternal(String.format("DELETE FROM system.%s WHERE 
token_bytes = ?", SystemKeyspace.RANGE_XFERS_CF), tokenBytes);
-        }
-    }
-
-    private boolean isReady()
-    {
-        int targetTokens = DatabaseDescriptor.getNumTokens();
-        int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10));
-        int actualTokens = StorageService.instance.getTokens().size();
-
-        if (actualTokens >= highMark)
-        {
-            LOG.warn("Pausing until token count stabilizes (target={}, 
actual={})", targetTokens, actualTokens);
-            return false;
-        }
-
-        return true;
-    }
-}
-
-class RangeTransferThreadFactory implements ThreadFactory
-{
-    private AtomicInteger count = new AtomicInteger(0);
-
-    public Thread newThread(Runnable r)
-    {
-        Thread rangeXferThread = new Thread(r);
-        rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", 
count.getAndIncrement()));
-        return rangeXferThread;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/test/unit/org/apache/cassandra/service/RelocateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java 
b/test/unit/org/apache/cassandra/service/RelocateTest.java
deleted file mode 100644
index 22a992c..0000000
--- a/test/unit/org/apache/cassandra/service/RelocateTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.junit.Assert.*;
-
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.SimpleSnitch;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class RelocateTest
-{
-    private static final int TOKENS_PER_NODE = 256;
-    private static final int TOKEN_STEP = 10;
-    private static final IPartitioner<?> partitioner = new RandomPartitioner();
-    private static IPartitioner<?> oldPartitioner;
-    private static VersionedValue.VersionedValueFactory vvFactory;
-
-    private StorageService ss = StorageService.instance;
-    private TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-
-    @Before
-    public void init()
-    {
-        tmd.clearUnsafe();
-    }
-
-    @BeforeClass
-    public static void setUp() throws Exception
-    {
-        oldPartitioner = 
StorageService.instance.setPartitionerUnsafe(partitioner);
-        SchemaLoader.loadSchema();
-        vvFactory = new VersionedValue.VersionedValueFactory(partitioner);
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception
-    {
-        StorageService.instance.setPartitionerUnsafe(oldPartitioner);
-    }
-
-    /** Setup a virtual node ring */
-    private static Map<Token<?>, InetAddress> createInitialRing(int size) 
throws UnknownHostException
-    {
-        Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>, 
InetAddress>();
-        int currentToken = TOKEN_STEP;
-
-        for(int i = 0; i < size; i++)
-        {
-            InetAddress endpoint = InetAddress.getByName("127.0.0." + 
String.valueOf(i + 1));
-            Gossiper.instance.initializeNodeUnsafe(endpoint, 
UUID.randomUUID(), 1);
-            List<Token> tokens = new ArrayList<Token>();
-
-            for (int j = 0; j < TOKENS_PER_NODE; j++)
-            {
-                Token token = new 
BigIntegerToken(String.valueOf(currentToken));
-                tokenMap.put(token, endpoint);
-                tokens.add(token);
-                currentToken += TOKEN_STEP;
-            }
-
-            Gossiper.instance.injectApplicationState(endpoint, 
ApplicationState.TOKENS, vvFactory.tokens(tokens));
-            StorageService.instance.onChange(endpoint, 
ApplicationState.STATUS, vvFactory.normal(tokens));
-        }
-
-        return tokenMap;
-    }
-
-    // Copy-paste from MoveTest.java
-    private AbstractReplicationStrategy getStrategy(String keyspaceName, 
TokenMetadata tmd)
-    {
-        KSMetaData ksmd = Schema.instance.getKSMetaData(keyspaceName);
-        return AbstractReplicationStrategy.createReplicationStrategy(
-                keyspaceName,
-                ksmd.strategyClass,
-                tmd,
-                new SimpleSnitch(),
-                ksmd.strategyOptions);
-    }
-
-    /** Ensure proper write endpoints during relocation */
-    @Test
-    public void testWriteEndpointsDuringRelocate() throws Exception
-    {
-        Map<Token<?>, InetAddress> tokenMap = createInitialRing(5);
-        Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, 
List<InetAddress>>();
-
-
-        for (Token<?> token : tokenMap.keySet())
-        {
-            BigIntegerToken keyToken = new 
BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
-            List<InetAddress> endpoints = new ArrayList<InetAddress>();
-            Iterator<Token> tokenIter = 
TokenMetadata.ringIterator(tmd.sortedTokens(), keyToken, false);
-            while (tokenIter.hasNext())
-            {
-                InetAddress ep = tmd.getEndpoint(tokenIter.next());
-                if (!endpoints.contains(ep))
-                    endpoints.add(ep);
-            }
-            expectedEndpoints.put(keyToken, endpoints);
-        }
-
-        // Relocate the first token from the first endpoint, to the second 
endpoint.
-        Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP));
-        ss.onChange(
-                InetAddress.getByName("127.0.0.2"),
-                ApplicationState.STATUS,
-                vvFactory.relocating(Collections.singleton(relocateToken)));
-        assertTrue(tmd.isRelocating(relocateToken));
-
-        AbstractReplicationStrategy strategy;
-        for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
-        {
-            strategy = getStrategy(keyspaceName, tmd);
-            for (Token token : tokenMap.keySet())
-            {
-                BigIntegerToken keyToken = new 
BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5")));
-
-                HashSet<InetAddress> actual = new 
HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken, keyspaceName, 
strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap())));
-                HashSet<InetAddress> expected = new HashSet<InetAddress>();
-
-                for (int i = 0; i < actual.size(); i++)
-                    expected.add(expectedEndpoints.get(keyToken).get(i));
-
-                assertEquals("mismatched endpoint sets", expected, actual);
-            }
-        }
-    }
-
-    /** Use STATUS changes to trigger membership update and validate results. 
*/
-    @Test
-    public void testRelocationSuccess() throws UnknownHostException
-    {
-        createInitialRing(5);
-
-        // Node handling the relocation (dst), and the token being relocated 
(src).
-        InetAddress relocator = InetAddress.getByName("127.0.0.3");
-        Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP));
-
-        // Send RELOCATING and ensure token status
-        ss.onChange(relocator, ApplicationState.STATUS, 
vvFactory.relocating(Collections.singleton(relocatee)));
-        assertTrue(tmd.isRelocating(relocatee));
-
-        // Create a list of the endpoint's existing tokens, and add the 
relocatee to it.
-        List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator));
-        SystemKeyspace.updateTokens(tokens);
-        tokens.add(relocatee);
-
-        // Send a normal status, then ensure all is copesetic.
-        Gossiper.instance.injectApplicationState(relocator, 
ApplicationState.TOKENS, vvFactory.tokens(tokens));
-        ss.onChange(relocator, ApplicationState.STATUS, 
vvFactory.normal(tokens));
-
-        // Relocating entries are removed after RING_DELAY
-        Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY + 10, 
TimeUnit.MILLISECONDS);
-
-        assertTrue(!tmd.isRelocating(relocatee));
-        assertEquals(tmd.getEndpoint(relocatee), relocator);
-    }
-}

Reply via email to