Remove shuffle/taketoken. Patch by brandonwilliams, reviewed by thobbs for CASSANDRA-7601
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440d2360 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440d2360 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440d2360 Branch: refs/heads/cassandra-2.1.0 Commit: 440d23603855d25592bd36a8832ea3af522fb923 Parents: 7e1adb4 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Jul 30 11:51:55 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Jul 30 11:51:55 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + bin/cassandra-shuffle | 57 -- debian/cassandra.install | 1 - .../apache/cassandra/gms/VersionedValue.java | 7 - .../apache/cassandra/locator/TokenMetadata.java | 102 +-- .../service/PendingRangeCalculatorService.java | 16 +- .../ScheduledRangeTransferExecutorService.java | 138 ---- .../cassandra/service/StorageService.java | 150 +--- .../cassandra/service/StorageServiceMBean.java | 10 - .../org/apache/cassandra/tools/NodeCmd.java | 6 - .../org/apache/cassandra/tools/NodeProbe.java | 5 - .../org/apache/cassandra/tools/Shuffle.java | 722 ------------------- .../apache/cassandra/tools/NodeToolHelp.yaml | 3 - .../apache/cassandra/service/RelocateTest.java | 206 ------ 15 files changed, 8 insertions(+), 1419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7f7d2bc..4f08764 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * Remove shuffle and taketoken (CASSANDRA-7601) * Switch liveRatio-related log messages to DEBUG (CASSANDRA-7467) * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611) * Always merge ranges owned by a single node (CASSANDRA-6930) http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index ea085ae..93fe0b1 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -20,6 +20,9 @@ New features - If you are using Leveled Compaction, you can now disable doing size-tiered compaction in L0 by starting Cassandra with -Dcassandra.disable_stcs_in_l0 (see CASSANDRA-6621 for details). + - Shuffle and taketoken have been removed. For clusters that choose to + upgrade to vnodes, creating a new datacenter with vnodes and migrating is + recommended. See http://goo.gl/Sna2S1 for further information. 2.0.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/bin/cassandra-shuffle ---------------------------------------------------------------------- diff --git a/bin/cassandra-shuffle b/bin/cassandra-shuffle deleted file mode 100755 index 53636f7..0000000 --- a/bin/cassandra-shuffle +++ /dev/null @@ -1,57 +0,0 @@ -#!/bin/sh -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ "x$CASSANDRA_INCLUDE" = "x" ]; then - for include in /usr/share/cassandra/cassandra.in.sh \ - /usr/local/share/cassandra/cassandra.in.sh \ - /opt/cassandra/cassandra.in.sh \ - "`dirname "$0"`/cassandra.in.sh"; do - if [ -r "$include" ]; then - . "$include" - break - fi - done -elif [ -r "$CASSANDRA_INCLUDE" ]; then - . "$CASSANDRA_INCLUDE" -fi - -# Use JAVA_HOME if set, otherwise look for java in PATH -if [ -x "$JAVA_HOME/bin/java" ]; then - JAVA="$JAVA_HOME/bin/java" -else - JAVA="`which java`" -fi - -if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then - echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2 - exit 1 -fi - -# Special-case path variables. -case "`uname`" in - CYGWIN*) - CLASSPATH="`cygpath -p -w "$CLASSPATH"`" - CASSANDRA_CONF="`cygpath -p -w "$CASSANDRA_CONF"`" - ;; -esac - -"$JAVA" -cp "$CLASSPATH" \ - -Xmx32m \ - -Dlog4j.configuration=log4j-tools.properties \ - org.apache.cassandra.tools.Shuffle $@ - -# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/debian/cassandra.install ---------------------------------------------------------------------- diff --git a/debian/cassandra.install b/debian/cassandra.install index e0182a7..4f3115b 100644 --- a/debian/cassandra.install +++ b/debian/cassandra.install @@ -21,7 +21,6 @@ bin/cqlsh usr/bin bin/sstablescrub usr/bin bin/sstableupgrade usr/bin bin/sstablesplit usr/bin -bin/cassandra-shuffle usr/bin tools/bin/cassandra-stress usr/bin tools/bin/token-generator usr/bin tools/bin/sstablelevelreset usr/bin http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index a7ee047..565a8cb 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -64,7 +64,6 @@ public class VersionedValue implements Comparable<VersionedValue> public final static String STATUS_LEAVING = "LEAVING"; public final static String STATUS_LEFT = "LEFT"; public final static String STATUS_MOVING = "MOVING"; - public final static String STATUS_RELOCATING = "RELOCATING"; public final static String REMOVING_TOKEN = "removing"; public final static String REMOVED_TOKEN = "removed"; @@ -168,12 +167,6 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue relocating(Collection<Token> srcTokens) - { - return new VersionedValue( - versionString(VersionedValue.STATUS_RELOCATING, StringUtils.join(srcTokens, VersionedValue.DELIMITER))); - } - public VersionedValue hostId(UUID hostId) { return new VersionedValue(hostId.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 54170f0..a673c94 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -87,9 +87,6 @@ public class TokenMetadata // nodes which are migrating to the new tokens in the ring private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>(); - // tokens which are migrating to new endpoints - private final ConcurrentMap<Token, InetAddress> relocatingTokens = new ConcurrentHashMap<Token, InetAddress>(); - /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); private volatile ArrayList<Token> sortedTokens; @@ -388,33 +385,6 @@ public class TokenMetadata } } - /** - * Add new relocating ranges (tokens moving from their respective endpoints, to another). - * @param tokens tokens being moved - * @param endpoint destination of moves - */ - public void addRelocatingTokens(Collection<Token> tokens, InetAddress endpoint) - { - assert endpoint != null; - assert tokens != null && tokens.size() > 0; - - lock.writeLock().lock(); - - try - { - for (Token token : tokens) - { - InetAddress prev = relocatingTokens.put(token, endpoint); - if (prev != null && !prev.equals(endpoint)) - logger.warn("Relocation of {} to {} overwrites previous to {}", new Object[]{token, endpoint, prev}); - } - } - finally - { - lock.writeLock().unlock(); - } - } - public void removeEndpoint(InetAddress endpoint) { assert endpoint != null; @@ -464,38 +434,6 @@ public class TokenMetadata } } - /** - * Remove pair of token/address from relocating ranges. - * @param endpoint - */ - public void removeFromRelocating(Token token, InetAddress endpoint) - { - assert endpoint != null; - assert token != null; - - lock.writeLock().lock(); - - try - { - InetAddress previous = relocatingTokens.remove(token); - - if (previous == null) - { - logger.debug("Cannot remove {}, not found among the relocating (previously removed?)", token); - } - else if (!previous.equals(endpoint)) - { - logger.warn( - "Removal of relocating token {} with mismatched endpoint ({} != {})", - new Object[]{token, endpoint, previous}); - } - } - finally - { - lock.writeLock().unlock(); - } - } - public Collection<Token> getTokens(InetAddress endpoint) { assert endpoint != null; @@ -570,22 +508,6 @@ public class TokenMetadata } } - public boolean isRelocating(Token token) - { - assert token != null; - - lock.readLock().lock(); - - try - { - return relocatingTokens.containsKey(token); - } - finally - { - lock.readLock().unlock(); - } - } - private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>(); /** @@ -658,7 +580,7 @@ public class TokenMetadata /** * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all - * current leave, move, and relocate operations have finished. + * current leave, and move operations have finished. * * @return new token metadata */ @@ -677,9 +599,6 @@ public class TokenMetadata for (Pair<Token, InetAddress> pair : movingEndpoints) metadata.updateNormalToken(pair.left, pair.right); - for (Map.Entry<Token, InetAddress> relocating: relocatingTokens.entrySet()) - metadata.updateNormalToken(relocating.getKey(), relocating.getValue()); - return metadata; } finally @@ -831,15 +750,6 @@ public class TokenMetadata } } - /** - * Ranges which are migrating to new endpoints. - * @return set of token-address pairs of relocating ranges - */ - public Map<Token, InetAddress> getRelocatingRanges() - { - return relocatingTokens; - } - public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin) { assert ring.size() > 0; @@ -994,16 +904,6 @@ public class TokenMetadata return sb.toString(); } - public String printRelocatingRanges() - { - StringBuilder sb = new StringBuilder(); - - for (Map.Entry<Token, InetAddress> entry : relocatingTokens.entrySet()) - sb.append(String.format("%s:%s%n", entry.getKey(), entry.getValue())); - - return sb.toString(); - } - public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName) { Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(keyspaceName); http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 59a8843..a9968d2 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -122,7 +122,7 @@ public class PendingRangeCalculatorService extends PendingRangeCalculatorService BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty()) + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty()) { if (logger.isDebugEnabled()) logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName); @@ -185,20 +185,6 @@ public class PendingRangeCalculatorService extends PendingRangeCalculatorService allLeftMetadata.removeEndpoint(endpoint); } - // Ranges being relocated. - for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet()) - { - InetAddress endpoint = relocating.getValue(); // address of the moving node - Token token = relocating.getKey(); - - allLeftMetadata.updateNormalToken(token, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - - allLeftMetadata.removeEndpoint(endpoint); - } - tm.setPendingRanges(keyspaceName, pendingRanges); if (logger.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java deleted file mode 100644 index 5591ea4..0000000 --- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import static org.apache.cassandra.cql3.QueryProcessor.processInternal; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Date; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ScheduledRangeTransferExecutorService -{ - private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class); - private static final int INTERVAL = 10; - private ScheduledExecutorService scheduler; - - public void setup() - { - if (DatabaseDescriptor.getNumTokens() == 1) - { - LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled"); - return; - } - - scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory()); - scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS); - LOG.info("Enabling scheduled transfers of token ranges"); - } - - public void tearDown() - { - if (scheduler == null) - { - LOG.warn("Unable to shutdown; Scheduler never enabled"); - return; - } - - LOG.info("Shutting down range transfer scheduler"); - scheduler.shutdownNow(); - } -} - -class RangeTransfer implements Runnable -{ - private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class); - - public void run() - { - UntypedResultSet res = processInternal("SELECT * FROM system." + SystemKeyspace.RANGE_XFERS_CF); - - if (res.size() < 1) - { - LOG.info("No queued ranges to transfer, shuffle complete. Run 'cassandra-shuffle disable' to stop this message."); - return; - } - - if (!isReady()) - return; - - UntypedResultSet.Row row = res.iterator().next(); - - Date requestedAt = row.getTimestamp("requested_at"); - ByteBuffer tokenBytes = row.getBytes("token_bytes"); - Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes); - - LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString()); - try - { - StorageService.instance.relocateTokens(Collections.singleton(token)); - } - catch (Exception e) - { - LOG.error("Error removing {}: {}", token, e); - } - finally - { - LOG.debug("Removing queued entry for transfer of {}", token); - processInternal(String.format("DELETE FROM system.%s WHERE token_bytes = 0x%s", - SystemKeyspace.RANGE_XFERS_CF, - ByteBufferUtil.bytesToHex(tokenBytes))); - } - } - - private boolean isReady() - { - int targetTokens = DatabaseDescriptor.getNumTokens(); - int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10)); - int actualTokens = StorageService.instance.getTokens().size(); - - if (actualTokens >= highMark) - { - LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens, actualTokens); - return false; - } - - return true; - } -} - -class RangeTransferThreadFactory implements ThreadFactory -{ - private AtomicInteger count = new AtomicInteger(0); - - public Thread newThread(Runnable r) - { - Thread rangeXferThread = new Thread(r); - rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement())); - return rangeXferThread; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c6f4ec9..8a1b3dc 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -175,7 +175,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ private double tracingProbability = 0.0; - private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, RELOCATING } + private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED } private Mode operationMode = Mode.STARTING; /* Used for tracking drain progress */ @@ -183,8 +183,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private static final AtomicInteger nextRepairCommand = new AtomicInteger(); - private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService(); - private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>(); private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor(); @@ -1358,8 +1356,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * Other STATUS values that may be seen (possibly anywhere in the normal progression): * STATUS_MOVING,newtoken * set if node is currently moving to a new token in the ring - * STATUS_RELOCATING,srcToken,srcToken,srcToken,... - * set if the endpoint is in the process of relocating a token to itself * REMOVING_TOKEN,deadtoken * set if the node is dead and is being removed by its REMOVAL_COORDINATOR * REMOVED_TOKEN,deadtoken @@ -1390,8 +1386,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE handleStateLeft(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_MOVING)) handleStateMoving(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_RELOCATING)) - handleStateRelocating(endpoint, pieces); } else { @@ -1612,33 +1606,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (!isClientMode) tokensToUpdateInSystemKeyspace.add(token); } - else if (tokenMetadata.isRelocating(token) && tokenMetadata.getRelocatingRanges().get(token).equals(endpoint)) - { - // Token was relocating, this is the bookkeeping that makes it official. - tokensToUpdateInMetadata.add(token); - if (!isClientMode) - tokensToUpdateInSystemKeyspace.add(token); - - optionalTasks.schedule(new Runnable() - { - public void run() - { - logger.info("Removing RELOCATION state for {} {}", endpoint, token); - getTokenMetadata().removeFromRelocating(token, endpoint); - } - }, RING_DELAY, TimeUnit.MILLISECONDS); - - // We used to own this token; This token will need to be removed from system.local - if (currentOwner.equals(FBUtilities.getBroadcastAddress())) - localTokensToRemove.add(token); - - logger.info("Token {} relocated to {}", token, endpoint); - } - else if (tokenMetadata.isRelocating(token)) - { - logger.info("Token {} is relocating to {}, ignoring update from {}", - token, tokenMetadata.getRelocatingRanges().get(token), endpoint); - } else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0) { tokensToUpdateInMetadata.add(token); @@ -1657,8 +1624,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE currentOwner, token, endpoint)); - if (logger.isDebugEnabled()) - logger.debug("Relocating ranges: {}", tokenMetadata.printRelocatingRanges()); } else { @@ -1667,8 +1632,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE currentOwner, token, endpoint)); - if (logger.isDebugEnabled()) - logger.debug("Relocating ranges: {}", tokenMetadata.printRelocatingRanges()); } } @@ -1767,26 +1730,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Handle one or more ranges (tokens) moving from their respective endpoints, to another. - * - * @param endpoint the destination of the move - * @param pieces STATE_RELOCATING,token,token,... - */ - private void handleStateRelocating(InetAddress endpoint, String[] pieces) - { - assert pieces.length >= 2; - - List<Token> tokens = new ArrayList<Token>(pieces.length - 1); - for (String tStr : Arrays.copyOfRange(pieces, 1, pieces.length)) - tokens.add(getPartitioner().getTokenFactory().fromString(tStr)); - - logger.debug("Tokens {} are relocating to {}", tokens, endpoint); - tokenMetadata.addRelocatingTokens(tokens, endpoint); - - PendingRangeCalculatorService.instance.update(); - } - - /** * Handle notification that a node being actively removed from the ring via 'removenode' * * @param endpoint node @@ -3180,7 +3123,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private class RangeRelocator { - private StreamPlan streamPlan = new StreamPlan("Relocation"); + private StreamPlan streamPlan = new StreamPlan("Moving"); private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames) { @@ -3270,85 +3213,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - public void relocate(Collection<String> srcTokens) throws IOException - { - List<Token> tokens = new ArrayList<Token>(srcTokens.size()); - try - { - for (String srcT : srcTokens) - { - getPartitioner().getTokenFactory().validate(srcT); - Token token = getPartitioner().getTokenFactory().fromString(srcT); - if (tokenMetadata.getTokens(tokenMetadata.getEndpoint(token)).size() < 2) - throw new IOException("Cannot relocate " + srcT + "; source node would have no tokens left"); - tokens.add(getPartitioner().getTokenFactory().fromString(srcT)); - } - } - catch (ConfigurationException e) - { - throw new IOException(e.getMessage()); - } - relocateTokens(tokens); - } - - void relocateTokens(Collection<Token> srcTokens) - { - assert srcTokens != null; - InetAddress localAddress = FBUtilities.getBroadcastAddress(); - Collection<Token> localTokens = getTokenMetadata().getTokens(localAddress); - Set<Token> tokens = new HashSet<Token>(srcTokens); - - Iterator<Token> it = tokens.iterator(); - while (it.hasNext()) - { - Token srcT = it.next(); - if (localTokens.contains(srcT)) - { - it.remove(); - logger.warn("cannot move {}; source and destination match", srcT); - } - } - - if (tokens.size() < 1) - { - logger.warn("no valid token arguments specified; nothing to relocate"); - return; - } - - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.relocating(tokens)); - setMode(Mode.RELOCATING, String.format("relocating %s to %s", tokens, localAddress.getHostAddress()), true); - - List<String> keyspaceNames = Schema.instance.getNonSystemKeyspaces(); - - setMode(Mode.RELOCATING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true); - Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS); - - RangeRelocator relocator = new RangeRelocator(tokens, keyspaceNames); - - if (relocator.streamsNeeded()) - { - setMode(Mode.RELOCATING, "fetching new ranges and streaming old ranges", true); - try - { - relocator.stream().get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage()); - } - } - else - { - setMode(Mode.RELOCATING, "no new ranges to stream/fetch", true); - } - - Collection<Token> currentTokens = SystemKeyspace.updateLocalTokens(tokens, Collections.<Token>emptyList()); - tokenMetadata.updateNormalTokens(currentTokens, FBUtilities.getBroadcastAddress()); - Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(currentTokens)); - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(currentTokens)); - setMode(Mode.NORMAL, false); - } - /** * Get the status of a token removal. */ @@ -3962,16 +3826,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return tracingProbability; } - public void enableScheduledRangeXfers() - { - rangeXferExecutor.setup(); - } - - public void disableScheduledRangeXfers() - { - rangeXferExecutor.tearDown(); - } - public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException { for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, columnFamilies)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 9b1487f..f578e10 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -318,11 +318,6 @@ public interface StorageServiceMBean extends NotificationEmitter public void move(String newToken) throws IOException; /** - * @param srcTokens tokens to move to this node - */ - public void relocate(Collection<String> srcTokens) throws IOException; - - /** * removeToken removes token (and all data associated with * enpoint that had it) from the ring */ @@ -491,11 +486,6 @@ public interface StorageServiceMBean extends NotificationEmitter */ public double getTracingProbability(); - /** Begin processing of queued range transfers. */ - public void enableScheduledRangeXfers(); - /** Disable processing of queued range transfers. */ - public void disableScheduledRangeXfers(); - void disableAutoCompaction(String ks, String ... columnFamilies) throws IOException; void enableAutoCompaction(String ks, String ... columnFamilies) throws IOException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index afa42dd..1d156ad 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -175,7 +175,6 @@ public class NodeCmd STATUSTHRIFT, STOP, STOPDAEMON, - TAKETOKEN, TPSTATS, TRUNCATEHINTS, UPGRADESSTABLES, @@ -1328,11 +1327,6 @@ public class NodeCmd probe.setTraceProbability(Double.parseDouble(arguments[0])); break; - case TAKETOKEN: - if (arguments.length < 1) { badUse("Must supply at least one token to take"); } - probe.takeTokens(arguments); - break; - case REBUILD : if (arguments.length > 1) { badUse("Too many arguments."); } probe.rebuild(arguments.length == 1 ? arguments[0] : null); http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index f0ee15d..fb12b30 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -471,11 +471,6 @@ public class NodeProbe ssProxy.move(newToken); } - public void takeTokens(String[] tokens) throws IOException - { - ssProxy.relocate(Arrays.asList(tokens)); - } - public void removeNode(String token) { ssProxy.removeNode(token); http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/java/org/apache/cassandra/tools/Shuffle.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/Shuffle.java b/src/java/org/apache/cassandra/tools/Shuffle.java deleted file mode 100644 index 7cd9d31..0000000 --- a/src/java/org/apache/cassandra/tools/Shuffle.java +++ /dev/null @@ -1,722 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.cassandra.tools; - -import java.io.Closeable; -import java.io.IOException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import javax.management.JMX; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.MissingArgumentException; - -import org.apache.cassandra.serializers.TimestampSerializer; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.EndpointSnitchInfoMBean; -import org.apache.cassandra.service.StorageServiceMBean; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TFastFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; - -public class Shuffle extends AbstractJmxClient -{ - private static final String ssObjName = "org.apache.cassandra.db:type=StorageService"; - private static final String epSnitchObjName = "org.apache.cassandra.db:type=EndpointSnitchInfo"; - - private StorageServiceMBean ssProxy = null; - private Random rand = new Random(System.currentTimeMillis()); - private final String thriftHost; - private final int thriftPort; - private final boolean thriftFramed; - private final String thriftUsername; - private final String thriftPassword; - - static - { - addCmdOption("th", "thrift-host", true, "Thrift hostname or IP address (Default: JMX host)"); - addCmdOption("tp", "thrift-port", true, "Thrift port number (Default: 9160)"); - addCmdOption("tf", "thrift-framed", false, "Enable framed transport for Thrift (Default: false)"); - addCmdOption("tu", "thrift-user", true, "Thrift username"); - addCmdOption("tpw", "thrift-password", true, "Thrift password"); - addCmdOption("en", "and-enable", true, "Immediately enable shuffling (create only)"); - addCmdOption("dc", "only-dc", true, "Apply only to named DC (create only)"); - } - - public Shuffle(String host, - int port, - String thriftHost, - int thriftPort, - boolean thriftFramed, - String jmxUsername, - String jmxPassword, - String thriftUsername, - String thriftPassword) throws IOException - { - super(host, port, jmxUsername, jmxPassword); - - this.thriftHost = thriftHost; - this.thriftPort = thriftPort; - this.thriftFramed = thriftFramed; - this.thriftUsername = thriftUsername; - this.thriftPassword = thriftPassword; - - // Setup the StorageService proxy. - ssProxy = getSSProxy(jmxConn.getMbeanServerConn()); - } - - private StorageServiceMBean getSSProxy(MBeanServerConnection mbeanConn) - { - StorageServiceMBean proxy; - try - { - ObjectName name = new ObjectName(ssObjName); - proxy = JMX.newMBeanProxy(mbeanConn, name, StorageServiceMBean.class); - } - catch (MalformedObjectNameException e) - { - throw new RuntimeException(e); - } - return proxy; - } - - private EndpointSnitchInfoMBean getEpSnitchProxy(MBeanServerConnection mbeanConn) - { - EndpointSnitchInfoMBean proxy; - try - { - ObjectName name = new ObjectName(epSnitchObjName); - proxy = JMX.newMBeanProxy(mbeanConn, name, EndpointSnitchInfoMBean.class); - } - catch (MalformedObjectNameException e) - { - throw new RuntimeException(e); - } - return proxy; - } - - /** - * Given a Multimap of endpoint to tokens, return a new randomized mapping. - * - * @param endpointMap current mapping of endpoint to tokens - * @return a new mapping of endpoint to tokens - */ - private Multimap<String, String> calculateRelocations(Multimap<String, String> endpointMap) - { - Multimap<String, String> relocations = HashMultimap.create(); - Set<String> endpoints = new HashSet<>(endpointMap.keySet()); - Map<String, Integer> endpointToNumTokens = new HashMap<>(endpoints.size()); - Map<String, Iterator<String>> iterMap = new HashMap<>(endpoints.size()); - - // Create maps of endpoint to token iterators, and endpoint to number of tokens. - for (String endpoint : endpoints) - { - try - { - endpointToNumTokens.put(endpoint, ssProxy.getTokens(endpoint).size()); - } - catch (UnknownHostException e) - { - throw new RuntimeException("What that...?", e); - } - - iterMap.put(endpoint, endpointMap.get(endpoint).iterator()); - } - - int epsToComplete = endpoints.size(); - Set<String> endpointsCompleted = new HashSet<>(); - - while (true) - { - endpoints.removeAll(endpointsCompleted); - - for (String endpoint : endpoints) - { - boolean choiceMade = false; - - if (!iterMap.get(endpoint).hasNext()) - { - endpointsCompleted.add(endpoint); - continue; - } - - String token = iterMap.get(endpoint).next(); - - List<String> subSet = new ArrayList<>(endpoints); - subSet.remove(endpoint); - Collections.shuffle(subSet, rand); - - for (String choice : subSet) - { - if (relocations.get(choice).size() < endpointToNumTokens.get(choice)) - { - relocations.put(choice, token); - choiceMade = true; - break; - } - } - - if (!choiceMade) - relocations.put(endpoint, token); - } - - // We're done when we've exhausted all of the token iterators - if (endpointsCompleted.size() == epsToComplete) - break; - } - - return relocations; - } - - /** - * Enable relocations. - * - * @param endpoints Collection of hostname or IP strings - */ - private void enableRelocations(Collection<String> endpoints) - { - for (String endpoint : endpoints) - { - try - { - JMXConnection conn = new JMXConnection(endpoint, port, username, password); - getSSProxy(conn.getMbeanServerConn()).enableScheduledRangeXfers(); - conn.close(); - } - catch (IOException e) - { - writeln("Failed to enable shuffling on %s!", endpoint); - } - } - } - - /** - * Disable relocations. - * - * @param endpoints Collection of hostname or IP strings - */ - private void disableRelocations(Collection<String> endpoints) - { - for (String endpoint : endpoints) - { - try - { - JMXConnection conn = new JMXConnection(endpoint, port, username, password); - getSSProxy(conn.getMbeanServerConn()).disableScheduledRangeXfers(); - conn.close(); - } - catch (IOException e) - { - writeln("Failed to enable shuffling on %s!", endpoint); - } - } - } - - /** - * Return a list of the live nodes (using JMX). - * - * @return String endpoint names - * @throws ShuffleError - */ - private Collection<String> getLiveNodes() throws ShuffleError - { - try - { - JMXConnection conn = new JMXConnection(host, port, username, password); - return getSSProxy(conn.getMbeanServerConn()).getLiveNodes(); - } - catch (IOException e) - { - throw new ShuffleError("Error retrieving list of nodes from JMX interface"); - } - } - - /** - * Create and distribute a new, randomized token to endpoint mapping. - * - * @throws ShuffleError on handled exceptions - */ - public void shuffle(boolean enable, String onlyDc) throws ShuffleError - { - Map<String, String> tokenMap; - Multimap<String, String> endpointMap = HashMultimap.create(); - EndpointSnitchInfoMBean epSnitchProxy = getEpSnitchProxy(jmxConn.getMbeanServerConn()); - - try - { - CassandraClient seedClient = getThriftClient(thriftHost); - tokenMap = seedClient.describe_token_map(); - - for (Map.Entry<String, String> entry : tokenMap.entrySet()) - { - String endpoint = entry.getValue(), token = entry.getKey(); - try - { - if (onlyDc != null) - { - if (onlyDc.equals(epSnitchProxy.getDatacenter(endpoint))) - endpointMap.put(endpoint, token); - } - else - endpointMap.put(endpoint, token); - } - catch (UnknownHostException e) - { - writeln("Warning: %s unknown to EndpointSnitch!", endpoint); - } - } - } - catch (InvalidRequestException ire) - { - throw new RuntimeException("What that...?", ire); - } - catch (TException e) - { - throw new ShuffleError(String.format("Thrift request to %s:%d failed: %s", thriftHost, thriftPort, e.getMessage())); - } - - Multimap<String, String> relocations = calculateRelocations(endpointMap); - - writeln("%-42s %-15s %-15s", "Token", "From", "To"); - writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~"); - - IPartitioner<?> partitioner = getPartitioner(); - - // Store relocations on remote nodes. - for (String endpoint : relocations.keySet()) - { - for (String tok : relocations.get(endpoint)) - writeln("%-42s %-15s %-15s", tok, tokenMap.get(tok), endpoint); - - executeCqlQuery(endpoint, createShuffleBatchInsert(relocations.get(endpoint), partitioner)); - } - - if (enable) - enableRelocations(relocations.keySet()); - } - - /** - * Print a list of pending token relocations for all nodes. - * - * @throws ShuffleError - */ - public void ls() throws ShuffleError - { - Map<String, List<CqlRow>> queuedRelocations = listRelocations(); - boolean justOnce = false; - IPartitioner<?> partitioner = getPartitioner(); - - for (String host : queuedRelocations.keySet()) - { - for (CqlRow row : queuedRelocations.get(host)) - { - assert row.getColumns().size() == 2; - - if (!justOnce) - { - writeln("%-42s %-15s %s", "Token", "Endpoint", "Requested at"); - writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); - justOnce = true; - } - - ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue()); - ByteBuffer requestedAt = ByteBuffer.wrap(row.getColumns().get(1).getValue()); - Date time = TimestampSerializer.instance.deserialize(requestedAt); - Token<?> token = partitioner.getTokenFactory().fromByteArray(tokenBytes); - - writeln("%-42s %-15s %s", token.toString(), host, time.toString()); - } - } - } - - /** - * List pending token relocations for all nodes. - * - * @throws ShuffleError - */ - private Map<String, List<CqlRow>> listRelocations() throws ShuffleError - { - String cqlQuery = "SELECT token_bytes,requested_at FROM system.range_xfers"; - Map<String, List<CqlRow>> results = new HashMap<>(); - - for (String host : getLiveNodes()) - { - CqlResult result = executeCqlQuery(host, cqlQuery); - results.put(host, result.getRows()); - } - - return results; - } - - /** - * Clear pending token relocations on all nodes. - * - * @throws ShuffleError - */ - public void clear() throws ShuffleError - { - Map<String, List<CqlRow>> queuedRelocations = listRelocations(); - - for (String host : queuedRelocations.keySet()) - { - for (CqlRow row : queuedRelocations.get(host)) - { - assert row.getColumns().size() == 2; - - ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue()); - String query = String.format("DELETE FROM system.range_xfers WHERE token_bytes = 0x%s", - ByteBufferUtil.bytesToHex(tokenBytes)); - executeCqlQuery(host, query); - } - } - } - - /** - * Enable shuffling on all nodes in the cluster. - * - * @throws ShuffleError - */ - public void enable() throws ShuffleError - { - enableRelocations(getLiveNodes()); - } - - /** - * Disable shuffling on all nodes in the cluster. - * - * @throws ShuffleError - */ - public void disable() throws ShuffleError - { - disableRelocations(getLiveNodes()); - } - - /** - * Setup and return a new Thrift RPC connection. - * - * @param hostName hostname or address to connect to - * @return a CassandraClient instance - * @throws ShuffleError - */ - private CassandraClient getThriftClient(String hostName) throws ShuffleError - { - try - { - return new CassandraClient(hostName, thriftPort, thriftFramed, thriftUsername, thriftPassword); - } - catch (TException e) - { - throw new ShuffleError(String.format("Unable to connect to %s/%d: %s", hostName, port, e.getMessage())); - } - } - - /** - * Execute a CQL v3 query. - * - * @param hostName hostname or address to connect to - * @param cqlQuery CQL query string - * @return a Thrift CqlResult instance - * @throws ShuffleError - */ - private CqlResult executeCqlQuery(String hostName, String cqlQuery) throws ShuffleError - { - try (CassandraClient client = getThriftClient(hostName)) - { - return client.execute_cql_query(ByteBuffer.wrap(cqlQuery.getBytes()), Compression.NONE); - } - catch (UnavailableException e) - { - throw new ShuffleError(String.format("Unable to write shuffle entries to %s. Reason: UnavailableException", hostName)); - } - catch (TimedOutException e) - { - throw new ShuffleError(String.format("Unable to write shuffle entries to %s. Reason: TimedOutException", hostName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - /** - * Return a partitioner instance for remote host. - * - * @return an IPartitioner instance - * @throws ShuffleError - */ - private IPartitioner<?> getPartitioner() throws ShuffleError - { - String partitionerName; - try - { - partitionerName = getThriftClient(thriftHost).describe_partitioner(); - } - catch (TException e) - { - throw new ShuffleError(String.format("Thrift request to %s:%d failed: %s", thriftHost, port, e.getMessage())); - } - - try - { - return (IPartitioner<?>) Class.forName(partitionerName).newInstance(); - } - catch (ClassNotFoundException e) - { - throw new ShuffleError("Unable to locate class for partitioner: " + partitionerName); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - /** - * Create and return a CQL batch insert statement for a set of token relocations. - * - * @param tokens tokens to be relocated - * @param partitioner an instance of the IPartitioner in use - * @return a query string - */ - private String createShuffleBatchInsert(Collection<String> tokens, IPartitioner<?> partitioner) - { - StringBuilder query = new StringBuilder(); - query.append("BEGIN BATCH").append("\n"); - - for (String tokenStr : tokens) - { - Token<?> token = partitioner.getTokenFactory().fromString(tokenStr); - String hexToken = ByteBufferUtil.bytesToHex(partitioner.getTokenFactory().toByteArray(token)); - query.append("INSERT INTO system.range_xfers (token_bytes, requested_at) ") - .append("VALUES (").append("0x").append(hexToken).append(", 'now');").append("\n"); - } - - query.append("APPLY BATCH").append("\n"); - return query.toString(); - } - - /** Print usage information. */ - private static void printShuffleHelp() - { - StringBuilder sb = new StringBuilder(); - sb.append("Sub-commands:").append(String.format("%n")); - sb.append(" create Initialize a new shuffle operation").append(String.format("%n")); - sb.append(" ls List pending relocations").append(String.format("%n")); - sb.append(" clear Clear pending relocations").append(String.format("%n")); - sb.append(" en[able] Enable shuffling").append(String.format("%n")); - sb.append(" dis[able] Disable shuffling").append(String.format("%n%n")); - - printHelp("shuffle [options] <sub-command>", sb.toString()); - } - - /** - * Execute. - * - * @param args arguments passed on the command line - * @throws Exception when face meets palm - */ - public static void main(String[] args) throws Exception - { - CommandLine cmd = null; - try - { - cmd = processArguments(args); - } - catch (MissingArgumentException e) - { - System.err.println(e.getMessage()); - System.exit(1); - } - - // Sub command argument. - if (cmd.getArgList().size() < 1) - { - System.err.println("Missing sub-command argument."); - printShuffleHelp(); - System.exit(1); - } - String subCommand = (String)(cmd.getArgList()).get(0); - - String hostName = (cmd.getOptionValue("host") != null) ? cmd.getOptionValue("host") : DEFAULT_HOST; - String port = (cmd.getOptionValue("port") != null) ? cmd.getOptionValue("port") : Integer.toString(DEFAULT_JMX_PORT); - String username = cmd.getOptionValue("username"); - String password = cmd.getOptionValue("password"); - String thriftHost = (cmd.getOptionValue("thrift-host") != null) ? cmd.getOptionValue("thrift-host") : hostName; - String thriftPort = (cmd.getOptionValue("thrift-port") != null) ? cmd.getOptionValue("thrift-port") : "9160"; - String thriftUsername = (cmd.getOptionValue("thrift-user") != null) ? cmd.getOptionValue("thrift-user") : null; - String thriftPassword = (cmd.getOptionValue("thrift-password") != null) ? cmd.getOptionValue("thrift-password") : null; - String onlyDc = cmd.getOptionValue("only-dc"); - boolean thriftFramed = cmd.hasOption("thrift-framed"); - boolean andEnable = cmd.hasOption("and-enable"); - int portNum = -1, thriftPortNum = -1; - - // Parse JMX port number - if (port != null) - { - try - { - portNum = Integer.parseInt(port); - } - catch (NumberFormatException ferr) - { - System.err.printf("%s is not a valid JMX port number.%n", port); - System.exit(1); - } - } - else - { - portNum = DEFAULT_JMX_PORT; - } - - // Parse Thrift port number - if (thriftPort != null) - { - try - { - thriftPortNum = Integer.parseInt(thriftPort); - } - catch (NumberFormatException ferr) - { - System.err.printf("%s is not a valid port number.%n", thriftPort); - System.exit(1); - } - } - else - { - thriftPortNum = 9160; - } - - Shuffle shuffler = new Shuffle(hostName, - portNum, - thriftHost, - thriftPortNum, - thriftFramed, - username, - password, - thriftUsername, - thriftPassword); - - try - { - if (subCommand.equals("create")) - shuffler.shuffle(andEnable, onlyDc); - else if (subCommand.equals("ls")) - shuffler.ls(); - else if (subCommand.startsWith("en")) - shuffler.enable(); - else if (subCommand.startsWith("dis")) - shuffler.disable(); - else if (subCommand.equals("clear")) - shuffler.clear(); - else - { - System.err.println("Unknown subcommand: " + subCommand); - printShuffleHelp(); - System.exit(1); - } - } - catch (ShuffleError err) - { - shuffler.writeln(err); - System.exit(1); - } - finally - { - shuffler.close(); - } - - System.exit(0); - } - - /** A self-contained Cassandra.Client; Closeable. */ - private static class CassandraClient implements Closeable - { - TTransport transport; - Cassandra.Client client; - - CassandraClient(String hostName, int port, boolean framed, String username, String password) throws TException - { - TSocket socket = new TSocket(hostName, port); - transport = (framed) ? socket : new TFastFramedTransport(socket); - transport.open(); - client = new Cassandra.Client(new TBinaryProtocol(transport)); - - if (username != null && password != null) - { - AuthenticationRequest request = new AuthenticationRequest(); - request.putToCredentials("username", username); - request.putToCredentials("password", password); - client.login(request); - } - - client.set_cql_version("3.0.0"); - } - - CqlResult execute_cql_query(ByteBuffer cqlQuery, Compression compression) throws Exception - { - return client.execute_cql3_query(cqlQuery, compression, ConsistencyLevel.ONE); - } - - String describe_partitioner() throws TException - { - return client.describe_partitioner(); - } - - Map<String, String> describe_token_map() throws TException - { - return client.describe_token_map(); - } - - public void close() - { - transport.close(); - } - } - - @SuppressWarnings("serial") - class ShuffleError extends Exception - { - ShuffleError(String msg) - { - super(msg); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml index 6467330..6f57846 100644 --- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml +++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml @@ -115,9 +115,6 @@ commands: - name: move <new token> help: | Move node on the token ring to a new token. (for negative tokens, use \\ to escape, Example: move \\-123) - - name: taketoken <token, ...> - help: | - Move the token(s) from the existing owner(s) to this node. For vnodes only. Use \\ to escape negative tokens. - name: removenode status|force|<ID> help: | Show status of current node removal, force completion of pending removal or remove provided ID http://git-wip-us.apache.org/repos/asf/cassandra/blob/440d2360/test/unit/org/apache/cassandra/service/RelocateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RelocateTest.java b/test/unit/org/apache/cassandra/service/RelocateTest.java deleted file mode 100644 index 21dfb96..0000000 --- a/test/unit/org/apache/cassandra/service/RelocateTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import static org.junit.Assert.*; - -import java.math.BigInteger; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.dht.BigIntegerToken; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.RandomPartitioner; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.SimpleSnitch; -import org.apache.cassandra.locator.TokenMetadata; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.util.concurrent.Uninterruptibles; - -public class RelocateTest -{ - private static final int TOKENS_PER_NODE = 256; - private static final int TOKEN_STEP = 10; - private static final IPartitioner<?> partitioner = new RandomPartitioner(); - private static IPartitioner<?> oldPartitioner; - private static VersionedValue.VersionedValueFactory vvFactory; - - private StorageService ss = StorageService.instance; - private TokenMetadata tmd = StorageService.instance.getTokenMetadata(); - - @Before - public void init() - { - tmd.clearUnsafe(); - } - - @BeforeClass - public static void setUp() throws Exception - { - oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); - SchemaLoader.loadSchema(); - vvFactory = new VersionedValue.VersionedValueFactory(partitioner); - } - - @AfterClass - public static void tearDown() throws Exception - { - StorageService.instance.setPartitionerUnsafe(oldPartitioner); - SchemaLoader.stopGossiper(); - } - - /** Setup a virtual node ring */ - private static Map<Token<?>, InetAddress> createInitialRing(int size) throws UnknownHostException - { - Map<Token<?>, InetAddress> tokenMap = new HashMap<Token<?>, InetAddress>(); - int currentToken = TOKEN_STEP; - - for(int i = 0; i < size; i++) - { - InetAddress endpoint = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1); - List<Token> tokens = new ArrayList<Token>(); - - for (int j = 0; j < TOKENS_PER_NODE; j++) - { - Token token = new BigIntegerToken(String.valueOf(currentToken)); - tokenMap.put(token, endpoint); - tokens.add(token); - currentToken += TOKEN_STEP; - } - - Gossiper.instance.injectApplicationState(endpoint, ApplicationState.TOKENS, vvFactory.tokens(tokens)); - StorageService.instance.onChange(endpoint, ApplicationState.STATUS, vvFactory.normal(tokens)); - } - - return tokenMap; - } - - // Copy-pasta from MoveTest.java - private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd) throws ConfigurationException - { - KSMetaData ksmd = Schema.instance.getKSMetaData(keyspaceName); - return AbstractReplicationStrategy.createReplicationStrategy( - keyspaceName, - ksmd.strategyClass, - tmd, - new SimpleSnitch(), - ksmd.strategyOptions); - } - - /** Ensure proper write endpoints during relocation */ - @Test - public void testWriteEndpointsDuringRelocate() throws Exception - { - Map<Token<?>, InetAddress> tokenMap = createInitialRing(5); - Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>(); - - - for (Token<?> token : tokenMap.keySet()) - { - BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5"))); - List<InetAddress> endpoints = new ArrayList<InetAddress>(); - Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), keyToken, false); - while (tokenIter.hasNext()) - { - InetAddress ep = tmd.getEndpoint(tokenIter.next()); - if (!endpoints.contains(ep)) - endpoints.add(ep); - } - expectedEndpoints.put(keyToken, endpoints); - } - - // Relocate the first token from the first endpoint, to the second endpoint. - Token relocateToken = new BigIntegerToken(String.valueOf(TOKEN_STEP)); - ss.onChange( - InetAddress.getByName("127.0.0.2"), - ApplicationState.STATUS, - vvFactory.relocating(Collections.singleton(relocateToken))); - assertTrue(tmd.isRelocating(relocateToken)); - - AbstractReplicationStrategy strategy; - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - { - strategy = getStrategy(keyspaceName, tmd); - for (Token token : tokenMap.keySet()) - { - BigIntegerToken keyToken = new BigIntegerToken(((BigInteger)token.token).add(new BigInteger("5"))); - - HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(keyToken, keyspaceName, strategy.calculateNaturalEndpoints(keyToken, tmd.cloneOnlyTokenMap()))); - HashSet<InetAddress> expected = new HashSet<InetAddress>(); - - for (int i = 0; i < actual.size(); i++) - expected.add(expectedEndpoints.get(keyToken).get(i)); - - assertEquals("mismatched endpoint sets", expected, actual); - } - } - } - - /** Use STATUS changes to trigger membership update and validate results. */ - @Test - public void testRelocationSuccess() throws UnknownHostException - { - createInitialRing(5); - - // Node handling the relocation (dst), and the token being relocated (src). - InetAddress relocator = InetAddress.getByName("127.0.0.3"); - Token relocatee = new BigIntegerToken(String.valueOf(TOKEN_STEP)); - - // Send RELOCATING and ensure token status - ss.onChange(relocator, ApplicationState.STATUS, vvFactory.relocating(Collections.singleton(relocatee))); - assertTrue(tmd.isRelocating(relocatee)); - - // Create a list of the endpoint's existing tokens, and add the relocatee to it. - List<Token> tokens = new ArrayList<Token>(tmd.getTokens(relocator)); - SystemKeyspace.updateTokens(tokens); - tokens.add(relocatee); - - // Send a normal status, then ensure all is copesetic. - Gossiper.instance.injectApplicationState(relocator, ApplicationState.TOKENS, vvFactory.tokens(tokens)); - ss.onChange(relocator, ApplicationState.STATUS, vvFactory.normal(tokens)); - - // Relocating entries are removed after RING_DELAY - Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY + 10, TimeUnit.MILLISECONDS); - - assertTrue(!tmd.isRelocating(relocatee)); - assertEquals(tmd.getEndpoint(relocatee), relocator); - } -}