Repository: cassandra Updated Branches: refs/heads/trunk fafcfc787 -> da3fd5d7a
fix merge Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3fd5d7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3fd5d7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3fd5d7 Branch: refs/heads/trunk Commit: da3fd5d7ac2bb0f64a32176c3722b857b71bc654 Parents: fafcfc7 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Jul 30 22:07:10 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Jul 30 22:08:40 2014 -0500 ---------------------------------------------------------------------- .../apache/cassandra/locator/TokenMetadata.java | 21 +- .../service/PendingRangeCalculatorService.java | 73 +------ .../ScheduledRangeTransferExecutorService.java | 135 ------------ .../apache/cassandra/service/RelocateTest.java | 204 ------------------- 4 files changed, 4 insertions(+), 429 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index bb5455c..2a6a624 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -701,10 +701,10 @@ public class TokenMetadata { Multimap<Range<Token>, InetAddress> newPendingRanges = HashMultimap.create(); - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty() && relocatingTokens.isEmpty()) + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) { if (logger.isDebugEnabled()) - logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName); + logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); pendingRanges.put(keyspaceName, newPendingRanges); return; @@ -746,7 +746,7 @@ public class TokenMetadata } // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving and relocating nodes. + // We can now finish the calculation by checking moving nodes. // For each of the moving nodes, we do the same thing we did for bootstrapping: // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. @@ -765,20 +765,6 @@ public class TokenMetadata allLeftMetadata.removeEndpoint(endpoint); } - // Ranges being relocated. - for (Map.Entry<Token, InetAddress> relocating : relocatingTokens.entrySet()) - { - InetAddress endpoint = relocating.getValue(); // address of the moving node - Token token = relocating.getKey(); - - allLeftMetadata.updateNormalToken(token, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - newPendingRanges.put(range, endpoint); - - allLeftMetadata.removeEndpoint(endpoint); - } - pendingRanges.put(keyspaceName, newPendingRanges); if (logger.isDebugEnabled()) @@ -936,7 +922,6 @@ public class TokenMetadata leavingEndpoints.clear(); pendingRanges.clear(); movingEndpoints.clear(); - relocatingTokens.clear(); sortedTokens.clear(); topology.clear(); invalidateCachedRings(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index d3aa6b6..2276c4a 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -96,77 +96,6 @@ public class PendingRangeCalculatorService // public & static for testing purposes public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) { - TokenMetadata tm = StorageService.instance.getTokenMetadata(); - Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); - BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); - Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); - - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty()) - { - if (logger.isDebugEnabled()) - logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - tm.setPendingRanges(keyspaceName, pendingRanges); - return; - } - - Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); - - // get all ranges that will be affected by leaving nodes - Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); - for (InetAddress endpoint : leavingEndpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); - - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - for (Range<Token> range : affectedRanges) - { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap())); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); - } - - // At this stage pendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. - Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse(); - for (InetAddress endpoint : bootstrapAddresses.keySet()) - { - Collection<Token> tokens = bootstrapAddresses.get(endpoint); - - allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - allLeftMetadata.removeEndpoint(endpoint); - } - - // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving and relocating nodes. - - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) - { - InetAddress endpoint = moving.right; // address of the moving node - - // moving.left is a new token of the endpoint - allLeftMetadata.updateNormalToken(moving.left, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - pendingRanges.put(range, endpoint); - } - - allLeftMetadata.removeEndpoint(endpoint); - } - - tm.setPendingRanges(keyspaceName, pendingRanges); - - if (logger.isDebugEnabled()) - logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges())); + StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java deleted file mode 100644 index b8117b9..0000000 --- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Date; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.dht.Token; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ScheduledRangeTransferExecutorService -{ - private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class); - private static final int INTERVAL = 10; - private ScheduledExecutorService scheduler; - - public void setup() - { - if (DatabaseDescriptor.getNumTokens() == 1) - { - LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled"); - return; - } - - scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory()); - scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS); - LOG.info("Enabling scheduled transfers of token ranges"); - } - - public void tearDown() - { - if (scheduler == null) - { - LOG.warn("Unable to shutdown; Scheduler never enabled"); - return; - } - - LOG.info("Shutting down range transfer scheduler"); - scheduler.shutdownNow(); - } -} - -class RangeTransfer implements Runnable -{ - private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class); - - public void run() - { - UntypedResultSet res = executeInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF); - - if (res.size() < 1) - { - LOG.info("No queued ranges to transfer, shuffle complete. Run 'cassandra-shuffle disable' to stop this message."); - return; - } - - if (!isReady()) - return; - - UntypedResultSet.Row row = res.iterator().next(); - - Date requestedAt = row.getTimestamp("requested_at"); - ByteBuffer tokenBytes = row.getBytes("token_bytes"); - Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes); - - LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString()); - try - { - StorageService.instance.relocateTokens(Collections.singleton(token)); - } - catch (Exception e) - { - LOG.error("Error removing {}: {}", token, e); - } - finally - { - LOG.debug("Removing queued entry for transfer of {}", token); - executeInternal(String.format("DELETE FROM system.%s WHERE token_bytes = ?", SystemKeyspace.RANGE_XFERS_CF), tokenBytes); - } - } - - private boolean isReady() - { - int targetTokens = DatabaseDescriptor.getNumTokens(); - int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10)); - int actualTokens = StorageService.instance.getTokens().size(); - - if (actualTokens >= highMark) - { - LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens, actualTokens); - return false; - } - - return true; - } -} - -class RangeTransferThreadFactory implements ThreadFactory -{ - private AtomicInteger count = new AtomicInteger(0); - - public Thread newThread(Runnable r) - { - Thread rangeXferThread = new Thread(r); - rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement())); - return rangeXferThread; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3fd5d7/test/unit/org/apache/cassandra/service/RelocateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java b/test/unit/org/apache/cassandra/service/RelocateTest.java deleted file mode 100644 index 22a992c..0000000 --- a/test/unit/org/apache/cassandra/service/RelocateTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import static org.junit.Assert.*; - -import java.math.BigInteger; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.dht.BigIntegerToken; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.RandomPartitioner; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.SimpleSnitch; -import org.apache.cassandra.locator.TokenMetadata; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.util.concurrent.Uninterruptibles; - -public class RelocateTest -{ - private static final int TOKENS_PER_NODE = 256; - private static final int TOKEN_STEP = 10; - private static final IPartitioner<?> partitioner = new RandomPartitioner(); - private static IPartitioner<?> oldPartitioner; - private static VersionedValue.VersionedValueFactory vvFactory; - - private StorageService ss = StorageService.instance; - private TokenMetadata tmd = StorageService.instance.getTokenMetadata(); - - @Before - public void init() - { - tmd.clearUnsafe(); - } - - @BeforeClass - public static void setUp() throws Exception - { - oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); - SchemaLoader.loadSchema(); - vvFactory = new VersionedValue.VersionedValueFactory(partitioner); - } - - @AfterClass - public static void tearDown() throws Exception - { - StorageService.instance.setPartitionerUnsafe(oldPartitioner); - } - - /** Setup a virtual node ring */ - private static Map<Token<?>, InetAddress> createInitialRing(int size) throws UnknownHostException - { - Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>, InetAddress>(); - int currentToken = TOKEN_STEP; - - for(int i = 0; i < size; i++) - { - InetAddress endpoint = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1); - List<Token> tokens = new ArrayList<Token>(); - - for (int j = 0; j < TOKENS_PER_NODE; j++) - { - Token token = new BigIntegerToken(String.valueOf(currentToken)); - tokenMap.put(token, endpoint); - tokens.add(token); - currentToken += TOKEN_STEP; - } - - Gossiper.instance.injectApplicationState(endpoint, ApplicationState.TOKENS, vvFactory.tokens(tokens)); - StorageService.instance.onChange(endpoint, ApplicationState.STATUS, vvFactory.normal(tokens)); - } - - return tokenMap; - } - - // Copy-paste from MoveTest.java - private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd) - { - KSMetaData ksmd = Schema.instance.getKSMetaData(keyspaceName); - return AbstractReplicationStrategy.createReplicationStrategy( - keyspaceName, - ksmd.strategyClass, - tmd, - new SimpleSnitch(), - ksmd.strategyOptions); - } - - /** Ensure proper write endpoints during relocation */ - @Test - public void testWriteEndpointsDuringRelocate() throws Exception - { - Map<Token<?>, InetAddress> tokenMap = createInitialRing(5); - Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>(); - - - for (Token<?> token : tokenMap.keySet()) - { - BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5"))); - List<InetAddress> endpoints = new ArrayList<InetAddress>(); - Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), keyToken, false); - while (tokenIter.hasNext()) - { - InetAddress ep = tmd.getEndpoint(tokenIter.next()); - if (!endpoints.contains(ep)) - endpoints.add(ep); - } - expectedEndpoints.put(keyToken, endpoints); - } - - // Relocate the first token from the first endpoint, to the second endpoint. - Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP)); - ss.onChange( - InetAddress.getByName("127.0.0.2"), - ApplicationState.STATUS, - vvFactory.relocating(Collections.singleton(relocateToken))); - assertTrue(tmd.isRelocating(relocateToken)); - - AbstractReplicationStrategy strategy; - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - { - strategy = getStrategy(keyspaceName, tmd); - for (Token token : tokenMap.keySet()) - { - BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5"))); - - HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken, keyspaceName, strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap()))); - HashSet<InetAddress> expected = new HashSet<InetAddress>(); - - for (int i = 0; i < actual.size(); i++) - expected.add(expectedEndpoints.get(keyToken).get(i)); - - assertEquals("mismatched endpoint sets", expected, actual); - } - } - } - - /** Use STATUS changes to trigger membership update and validate results. */ - @Test - public void testRelocationSuccess() throws UnknownHostException - { - createInitialRing(5); - - // Node handling the relocation (dst), and the token being relocated (src). - InetAddress relocator = InetAddress.getByName("127.0.0.3"); - Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP)); - - // Send RELOCATING and ensure token status - ss.onChange(relocator, ApplicationState.STATUS, vvFactory.relocating(Collections.singleton(relocatee))); - assertTrue(tmd.isRelocating(relocatee)); - - // Create a list of the endpoint's existing tokens, and add the relocatee to it. - List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator)); - SystemKeyspace.updateTokens(tokens); - tokens.add(relocatee); - - // Send a normal status, then ensure all is copesetic. - Gossiper.instance.injectApplicationState(relocator, ApplicationState.TOKENS, vvFactory.tokens(tokens)); - ss.onChange(relocator, ApplicationState.STATUS, vvFactory.normal(tokens)); - - // Relocating entries are removed after RING_DELAY - Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY + 10, TimeUnit.MILLISECONDS); - - assertTrue(!tmd.isRelocating(relocatee)); - assertEquals(tmd.getEndpoint(relocatee), relocator); - } -}