This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a93bb228d0a625e73d29733354f7fa59671b6813 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 2 19:47:53 2023 +0000 [CEP-21] Remove TokenMetadata (6/7) Part 6 of 7 Completely remove TokenMetadata, the intention is to bring it back in a stripped down form, available to tests only, so we can continue to verify equivalence between old and new code. Test code is still extremely broken at this point, but non-test code is buildable again, though almost certainly not actually runnable. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../apache/cassandra/locator/TokenMetadata.java | 1626 -------------------- .../locator/TokenMetadataDiagnostics.java | 46 - .../cassandra/locator/TokenMetadataEvent.java | 62 - .../apache/cassandra/service/StorageService.java | 1 - .../apache/cassandra/stress/CompactionStress.java | 18 +- 5 files changed, 11 insertions(+), 1742 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java deleted file mode 100644 index abbc76d482..0000000000 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ /dev/null @@ -1,1626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.locator; - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import javax.annotation.concurrent.GuardedBy; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.tcm.compatibility.AsEndpoints; -import org.apache.cassandra.tcm.compatibility.AsLocations; -import org.apache.cassandra.tcm.compatibility.AsTokenMap; -import org.apache.cassandra.tcm.compatibility.TokenRingUtils; -import org.apache.cassandra.tcm.membership.Location; -import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.utils.BiMultiValMap; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.SortedBiMultiValMap; - -import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR; -import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; - -public class TokenMetadata implements AsEndpoints, AsLocations, AsTokenMap -{ - private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class); - - /** - * Maintains token to endpoint map of every node in the cluster. - * Each Token is associated with exactly one Address, but each Address may have - * multiple tokens. Hence, the BiMultiValMap collection. - */ - private final BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap; - - /** Maintains endpoint to host ID map of every node in the cluster */ - private final BiMap<InetAddressAndPort, UUID> endpointToHostIdMap; - - // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddressAndPort> pendingRanges<tt>, - // which was added to when a node began bootstrap and removed from when it finished. - // - // This is inadequate when multiple changes are allowed simultaneously. For example, - // suppose that there is a ring of nodes A, C and E, with replication factor 3. - // Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D. - // Now suppose node B bootstraps between A and C at the same time. Its pending ranges - // would be C-E, E-A and A-B. Now both nodes need to be assigned pending range E-A, - // which we would be unable to represent with the old Map. The same thing happens - // even more obviously for any nodes that boot simultaneously between same two nodes. - // - // So, we made two changes: - // - // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddressAndPort></tt> (now - // <tt>Map<String, Multimap<Range, InetAddressAndPort>></tt>, because replication strategy - // and options are per-KeySpace). - // - // Second, we added the bootstrapTokens and leavingEndpoints collections, so we can - // rebuild pendingRanges from the complete information of what is going on, when - // additional changes are made mid-operation. - // - // Finally, note that recording the tokens of joining nodes in bootstrapTokens also - // means we can detect and reject the addition of multiple nodes at the same token - // before one becomes part of the ring. - private final BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens = new BiMultiValMap<>(); - - private final BiMap<InetAddressAndPort, InetAddressAndPort> replacementToOriginal = HashBiMap.create(); - - // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) - private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>(); - // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} - // NOTE: this may contain ranges that conflict with the those implied by sortedTokens when a range is changing its transient status - private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>(); - - // nodes which are migrating to the new tokens in the ring - private final Set<Pair<Token, InetAddressAndPort>> movingEndpoints = new HashSet<>(); - - /* Use this lock for manipulating the token map */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - private volatile ArrayList<Token> sortedTokens; // safe to be read without a lock, as it's never mutated - - private volatile Topology topology; - - public final IPartitioner partitioner; - - // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership - @GuardedBy("lock") - private long ringVersion = 0; - - public TokenMetadata() - { - this(SortedBiMultiValMap.create(), - HashBiMap.create(), - Topology.empty(), - DatabaseDescriptor.getPartitioner()); - } - - public TokenMetadata(IEndpointSnitch snitch) - { - this(SortedBiMultiValMap.create(), - HashBiMap.create(), - Topology.builder(() -> snitch).build(), - DatabaseDescriptor.getPartitioner()); - } - - private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner) - { - this(tokenToEndpointMap, endpointsMap, topology, partitioner, 0); - } - - private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion) - { - this.tokenToEndpointMap = tokenToEndpointMap; - this.topology = topology; - this.partitioner = partitioner; - endpointToHostIdMap = endpointsMap; - sortedTokens = sortTokens(); - this.ringVersion = ringVersion; - } - - /** - * To be used by tests only (via {@link org.apache.cassandra.service.StorageService#setPartitionerUnsafe}). - */ - @VisibleForTesting - public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner) - { - return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, topology, newPartitioner); - } - - private ArrayList<Token> sortTokens() - { - return new ArrayList<>(tokenToEndpointMap.keySet()); - } - - /** @return the number of nodes bootstrapping into source's primary range */ - public int pendingRangeChanges(InetAddressAndPort source) - { - int n = 0; - Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source)); - lock.readLock().lock(); - try - { - for (Token token : bootstrapTokens.keySet()) - for (Range<Token> range : sourceRanges) - if (range.contains(token)) - n++; - } - finally - { - lock.readLock().unlock(); - } - return n; - } - - /** - * Update token map with a single token/endpoint pair in normal state. - */ - public void updateNormalToken(Token token, InetAddressAndPort endpoint) - { - updateNormalTokens(Collections.singleton(token), endpoint); - } - - public void updateNormalTokens(Collection<Token> tokens, InetAddressAndPort endpoint) - { - Multimap<InetAddressAndPort, Token> endpointTokens = HashMultimap.create(); - endpointTokens.putAll(endpoint, tokens); - updateNormalTokens(endpointTokens); - } - - /** - * Update token map with a set of token/endpoint pairs in normal state. - * - * Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple) - * is expensive (CASSANDRA-3831). - */ - public void updateNormalTokens(Multimap<InetAddressAndPort, Token> endpointTokens) - { - if (endpointTokens.isEmpty()) - return; - - lock.writeLock().lock(); - try - { - boolean shouldSortTokens = false; - Topology.Builder topologyBuilder = topology.unbuild(); - for (InetAddressAndPort endpoint : endpointTokens.keySet()) - { - Collection<Token> tokens = endpointTokens.get(endpoint); - - assert tokens != null && !tokens.isEmpty(); - - bootstrapTokens.removeValue(endpoint); - tokenToEndpointMap.removeValue(endpoint); - topologyBuilder.addEndpoint(endpoint); - leavingEndpoints.remove(endpoint); - replacementToOriginal.remove(endpoint); - removeFromMoving(endpoint); // also removing this endpoint from moving - - for (Token token : tokens) - { - InetAddressAndPort prev = tokenToEndpointMap.put(token, endpoint); - if (!endpoint.equals(prev)) - { - if (prev != null) - logger.warn("Token {} changing ownership from {} to {}", token, prev, endpoint); - shouldSortTokens = true; - } - } - } - topology = topologyBuilder.build(); - - if (shouldSortTokens) - sortedTokens = sortTokens(); - } - finally - { - lock.writeLock().unlock(); - } - } - - /** - * Store an end-point to host ID mapping. Each ID must be unique, and - * cannot be changed after the fact. - */ - public void updateHostId(UUID hostId, InetAddressAndPort endpoint) - { - assert hostId != null; - assert endpoint != null; - - lock.writeLock().lock(); - try - { - updateEndpointToHostIdMap(hostId, endpoint); - } - finally - { - lock.writeLock().unlock(); - } - - } - - public void updateHostIds(Map<UUID, InetAddressAndPort> hostIdToEndpointMap) - { - lock.writeLock().lock(); - try - { - for (Map.Entry<UUID, InetAddressAndPort> entry : hostIdToEndpointMap.entrySet()) - { - updateEndpointToHostIdMap(entry.getKey(), entry.getValue()); - } - } - finally - { - lock.writeLock().unlock(); - } - - } - - private void updateEndpointToHostIdMap(UUID hostId, InetAddressAndPort endpoint) - { - InetAddressAndPort storedEp = endpointToHostIdMap.inverse().get(hostId); - if (storedEp != null) - { - if (!storedEp.equals(endpoint) && (FailureDetector.instance.isAlive(storedEp))) - { - throw new RuntimeException(String.format("Host ID collision between active endpoint %s and %s (id=%s)", - storedEp, - endpoint, - hostId)); - } - } - - UUID storedId = endpointToHostIdMap.get(endpoint); - if ((storedId != null) && (!storedId.equals(hostId))) - logger.warn("Changing {}'s host ID from {} to {}", endpoint, storedId, hostId); - - endpointToHostIdMap.forcePut(endpoint, hostId); - } - - /** Return the unique host ID for an end-point. */ - public UUID getHostId(InetAddressAndPort endpoint) - { - lock.readLock().lock(); - try - { - return endpointToHostIdMap.get(endpoint); - } - finally - { - lock.readLock().unlock(); - } - } - - /** Return the end-point for a unique host ID */ - public InetAddressAndPort getEndpointForHostId(UUID hostId) - { - lock.readLock().lock(); - try - { - return endpointToHostIdMap.inverse().get(hostId); - } - finally - { - lock.readLock().unlock(); - } - } - - /** @return a copy of the endpoint-to-id map for read-only operations */ - public Map<InetAddressAndPort, UUID> getEndpointToHostIdMapForReading() - { - lock.readLock().lock(); - try - { - Map<InetAddressAndPort, UUID> readMap = new HashMap<>(); - readMap.putAll(endpointToHostIdMap); - return readMap; - } - finally - { - lock.readLock().unlock(); - } - } - - @Deprecated - public void addBootstrapToken(Token token, InetAddressAndPort endpoint) - { - addBootstrapTokens(Collections.singleton(token), endpoint); - } - - public void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint) - { - addBootstrapTokens(tokens, endpoint, null); - } - - private void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint, InetAddressAndPort original) - { - assert tokens != null && !tokens.isEmpty(); - assert endpoint != null; - - lock.writeLock().lock(); - try - { - - InetAddressAndPort oldEndpoint; - - for (Token token : tokens) - { - oldEndpoint = bootstrapTokens.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint)) - throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); - - oldEndpoint = tokenToEndpointMap.get(token); - if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original)) - throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token); - } - - bootstrapTokens.removeValue(endpoint); - - for (Token token : tokens) - bootstrapTokens.put(token, endpoint); - } - finally - { - lock.writeLock().unlock(); - } - } - - public void addReplaceTokens(Collection<Token> replacingTokens, InetAddressAndPort newNode, InetAddressAndPort oldNode) - { - assert replacingTokens != null && !replacingTokens.isEmpty(); - assert newNode != null && oldNode != null; - - lock.writeLock().lock(); - try - { - Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode); - if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens)) - { - throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " + - "different set of tokens %s.", newNode, oldNode, oldNodeTokens, - replacingTokens)); - } - - logger.debug("Replacing {} with {}", newNode, oldNode); - replacementToOriginal.put(newNode, oldNode); - - addBootstrapTokens(replacingTokens, newNode, oldNode); - } - finally - { - lock.writeLock().unlock(); - } - } - - public Optional<InetAddressAndPort> getReplacementNode(InetAddressAndPort endpoint) - { - lock.readLock().lock(); - try - { - return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint)); - } - finally - { - lock.readLock().unlock(); - } - } - - public Optional<InetAddressAndPort> getReplacingNode(InetAddressAndPort endpoint) - { - lock.readLock().lock(); - try - { - return Optional.ofNullable((replacementToOriginal.get(endpoint))); - } - finally - { - lock.readLock().unlock(); - } - } - - public void removeBootstrapTokens(Collection<Token> tokens) - { - assert tokens != null && !tokens.isEmpty(); - - lock.writeLock().lock(); - try - { - for (Token token : tokens) - bootstrapTokens.remove(token); - } - finally - { - lock.writeLock().unlock(); - } - } - - public void addLeavingEndpoint(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.writeLock().lock(); - try - { - leavingEndpoints.add(endpoint); - } - finally - { - lock.writeLock().unlock(); - } - } - - /** - * Add a new moving endpoint - * @param token token which is node moving to - * @param endpoint address of the moving node - */ - public void addMovingEndpoint(Token token, InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.writeLock().lock(); - try - { - movingEndpoints.add(Pair.create(token, endpoint)); - } - finally - { - lock.writeLock().unlock(); - } - } - - public void removeEndpoint(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.writeLock().lock(); - try - { - bootstrapTokens.removeValue(endpoint); - - topology = topology.unbuild().removeEndpoint(endpoint).build(); - leavingEndpoints.remove(endpoint); - if (replacementToOriginal.remove(endpoint) != null) - { - logger.debug("Node {} failed during replace.", endpoint); - } - endpointToHostIdMap.remove(endpoint); - Collection<Token> removedTokens = tokenToEndpointMap.removeValue(endpoint); - if (removedTokens != null && !removedTokens.isEmpty()) - { - sortedTokens = sortTokens(); - invalidateCachedRingsUnsafe(); - } - } - finally - { - lock.writeLock().unlock(); - } - } - - /** - * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238. - */ - public Topology updateTopology(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.writeLock().lock(); - try - { - logger.info("Updating topology for {}", endpoint); - topology = topology.unbuild().updateEndpoint(endpoint).build(); - invalidateCachedRingsUnsafe(); - return topology; - } - finally - { - lock.writeLock().unlock(); - } - } - - /** - * This is called when the snitch properties for many endpoints are updated, it will update - * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238. - */ - public Topology updateTopology() - { - lock.writeLock().lock(); - try - { - logger.info("Updating topology for all endpoints that have changed"); - topology = topology.unbuild().updateEndpoints().build(); - invalidateCachedRingsUnsafe(); - return topology; - } - finally - { - lock.writeLock().unlock(); - } - } - - /** - * Remove pair of token/address from moving endpoints - * @param endpoint address of the moving node - */ - public void removeFromMoving(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.writeLock().lock(); - try - { - for (Pair<Token, InetAddressAndPort> pair : movingEndpoints) - { - if (pair.right.equals(endpoint)) - { - movingEndpoints.remove(pair); - break; - } - } - - invalidateCachedRingsUnsafe(); - } - finally - { - lock.writeLock().unlock(); - } - } - - public Collection<Token> getTokens(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.readLock().lock(); - try - { - assert isMember(endpoint): String.format("Unable to get tokens for %s; it is not a member", endpoint); // don't want to return nulls - return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint)); - } - finally - { - lock.readLock().unlock(); - } - } - - @Deprecated - public Token getToken(InetAddressAndPort endpoint) - { - return getTokens(endpoint).iterator().next(); - } - - public boolean isMember(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.readLock().lock(); - try - { - return tokenToEndpointMap.inverse().containsKey(endpoint); - } - finally - { - lock.readLock().unlock(); - } - } - - public boolean isLeaving(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.readLock().lock(); - try - { - return leavingEndpoints.contains(endpoint); - } - finally - { - lock.readLock().unlock(); - } - } - - public boolean isMoving(InetAddressAndPort endpoint) - { - assert endpoint != null; - - lock.readLock().lock(); - try - { - for (Pair<Token, InetAddressAndPort> pair : movingEndpoints) - { - if (pair.right.equals(endpoint)) - return true; - } - - return false; - } - finally - { - lock.readLock().unlock(); - } - } - - private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<>(); - - /** - * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges, - * bootstrap tokens and leaving endpoints are not included in the copy. - */ - public TokenMetadata cloneOnlyTokenMap() - { - lock.readLock().lock(); - try - { - return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap), - HashBiMap.create(endpointToHostIdMap), - topology, - partitioner, - ringVersion); - } - finally - { - lock.readLock().unlock(); - } - } - - /** - * Return a cached TokenMetadata with only tokenToEndpointMap, i.e., the same as cloneOnlyTokenMap but - * uses a cached copy that is invalided when the ring changes, so in the common case - * no extra locking is required. - * - * Callers must *NOT* mutate the returned metadata object. - */ - public TokenMetadata cachedOnlyTokenMap() - { - TokenMetadata tm = cachedTokenMap.get(); - if (tm != null) - return tm; - - // synchronize to prevent thundering herd (CASSANDRA-6345) - synchronized (this) - { - if ((tm = cachedTokenMap.get()) != null) - return tm; - - tm = cloneOnlyTokenMap(); - cachedTokenMap.set(tm); - return tm; - } - } - - /** - * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all - * current leave operations have finished. - * - * @return new token metadata - */ - public TokenMetadata cloneAfterAllLeft() - { - lock.readLock().lock(); - try - { - return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints); - } - finally - { - lock.readLock().unlock(); - } - } - - private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddressAndPort> leavingEndpoints) - { - for (InetAddressAndPort endpoint : leavingEndpoints) - allLeftMetadata.removeEndpoint(endpoint); - - return allLeftMetadata; - } - - /** - * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all - * current leave, and move operations have finished. - * - * @return new token metadata - */ - public TokenMetadata cloneAfterAllSettled() - { - lock.readLock().lock(); - try - { - TokenMetadata metadata = cloneOnlyTokenMap(); - - for (InetAddressAndPort endpoint : leavingEndpoints) - metadata.removeEndpoint(endpoint); - - - for (Pair<Token, InetAddressAndPort> pair : movingEndpoints) - metadata.updateNormalToken(pair.left, pair.right); - - return metadata; - } - finally - { - lock.readLock().unlock(); - } - } - - public InetAddressAndPort getEndpoint(Token token) - { - lock.readLock().lock(); - try - { - return tokenToEndpointMap.get(token); - } - finally - { - lock.readLock().unlock(); - } - } - - public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens) - { - return TokenRingUtils.getPrimaryRangesFor(sortedTokens(), tokens); - } - - @Deprecated - public Range<Token> getPrimaryRangeFor(Token right) - { - return getPrimaryRangesFor(Arrays.asList(right)).iterator().next(); - } - - public ArrayList<Token> sortedTokens() - { - return sortedTokens; - } - - public EndpointsByRange getPendingRangesMM(String keyspaceName) - { - EndpointsByRange.Builder byRange = new EndpointsByRange.Builder(); - PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); - - if (pendingRangeMaps != null) - { - for (Map.Entry<Range<Token>, EndpointsForRange.Builder> entry : pendingRangeMaps) - { - byRange.putAll(entry.getKey(), entry.getValue(), Conflict.ALL); - } - } - - return byRange.build(); - } - - /** a mutable map may be returned but caller should not modify it */ - public PendingRangeMaps getPendingRanges(String keyspaceName) - { - return this.pendingRanges.get(keyspaceName); - } - - public RangesAtEndpoint getPendingRanges(String keyspaceName, InetAddressAndPort endpoint) - { - RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint); - for (Map.Entry<Range<Token>, Replica> entry : getPendingRangesMM(keyspaceName).flattenEntries()) - { - Replica replica = entry.getValue(); - if (replica.endpoint().equals(endpoint)) - { - builder.add(replica, Conflict.DUPLICATE); - } - } - return builder.build(); - } - - /** - * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: - * - * (1) When in doubt, it is better to write too much to a node than too little. That is, if - * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning - * up unneeded data afterwards is better than missing writes during movement. - * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional - * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore - * we will first remove _all_ leaving tokens for the sake of calculation and then check what - * ranges would go where if all nodes are to leave. This way we get the biggest possible - * ranges with regard current leave operations, covering all subsets of possible final range - * values. - * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing - * complex calculations to see if multiple bootstraps overlap, we simply base calculations - * on the same token ring used before (reflecting situation after all leave operations have - * completed). Bootstrapping nodes will be added and removed one by one to that metadata and - * checked what their ranges would be. This will give us the biggest possible ranges the - * node could have. It might be that other bootstraps make our actual final ranges smaller, - * but it does not matter as we can clean up the data afterwards. - * - * NOTE: This is heavy and ineffective operation. This will be done only once when a node - * changes state in the cluster, so it should be manageable. - */ - public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) - { - // avoid race between both branches - do not use a lock here as this will block any other unrelated operations! - long startedAt = currentTimeMillis(); - synchronized (pendingRanges) - { - TokenMetadataDiagnostics.pendingRangeCalculationStarted(this, keyspaceName); - - unsafeCalculatePendingRanges(strategy, keyspaceName); - - if (logger.isDebugEnabled()) - logger.debug("Starting pending range calculation for {}", keyspaceName); - - long took = currentTimeMillis() - startedAt; - - if (logger.isDebugEnabled()) - logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took); - if (logger.isTraceEnabled()) - logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges())); - } - } - - public void unsafeCalculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) - { - // create clone of current state - BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone; - Set<InetAddressAndPort> leavingEndpointsClone; - Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone; - TokenMetadata metadata; - - lock.readLock().lock(); - try - { - - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) - { - if (logger.isTraceEnabled()) - logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) - { - if (logger.isTraceEnabled()) - logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - pendingRanges.put(keyspaceName, new PendingRangeMaps()); - - return; - } - } - - bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens); - leavingEndpointsClone = new HashSet<>(this.leavingEndpoints); - movingEndpointsClone = new HashSet<>(this.movingEndpoints); - metadata = this.cloneOnlyTokenMap(); - } - finally - { - lock.readLock().unlock(); - } - - pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone, - leavingEndpointsClone, movingEndpointsClone)); - } - - /** - * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String) - */ - private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy, - TokenMetadata metadata, - BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens, - Set<InetAddressAndPort> leavingEndpoints, - Set<Pair<Token, InetAddressAndPort>> movingEndpoints) - { - PendingRangeMaps newPendingRanges = new PendingRangeMaps(); - - RangesByEndpoint addressRanges = strategy.getAddressReplicas(metadata); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints); - - // get all ranges that will be affected by leaving nodes - Set<Range<Token>> removeAffectedRanges = new HashSet<>(); - for (InetAddressAndPort endpoint : leavingEndpoints) - removeAffectedRanges.addAll(addressRanges.get(endpoint).ranges()); - - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - for (Range<Token> range : removeAffectedRanges) - { - EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata); - EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata); - for (Replica newReplica : newReplicas) - { - if (currentReplicas.endpoints().contains(newReplica.endpoint())) - continue; - - // we calculate pending replicas for leave- and move- affected ranges in the same way to avoid - // a possible conflict when 2 pending replicas have the same endpoint and different ranges. - for (Replica pendingReplica : newReplica.subtractSameReplication(addressRanges.get(newReplica.endpoint()))) - newPendingRanges.addPendingRange(range, pendingReplica); - } - } - - // At this stage newPendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - - // For each of the bootstrapping nodes, simply add to the allLeftMetadata and check what their - // ranges would be. We actually need to clone allLeftMetadata each time as resetting its state - // after getting the new pending ranges is not as simple as just removing the bootstrapping - // endpoint. If the bootstrapping endpoint constitutes a replacement, removing it after checking - // the newly pending ranges means there are now fewer endpoints that there were originally and - // causes its next neighbour to take over its primary range which affects the next RF endpoints - // in the ring. - Multimap<InetAddressAndPort, Token> bootstrapAddresses = bootstrapTokens.inverse(); - for (InetAddressAndPort endpoint : bootstrapAddresses.keySet()) - { - Collection<Token> tokens = bootstrapAddresses.get(endpoint); - TokenMetadata cloned = allLeftMetadata.cloneOnlyTokenMap(); - cloned.updateNormalTokens(tokens, endpoint); - for (Replica replica : strategy.getAddressReplicas(cloned, endpoint)) - { - newPendingRanges.addPendingRange(replica.range(), replica); - } - } - - // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. - - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddressAndPort> moving : movingEndpoints) - { - //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. - Set<Replica> moveAffectedReplicas = new HashSet<>(); - InetAddressAndPort endpoint = moving.right; // address of the moving node - //Add ranges before the move - for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint)) - { - moveAffectedReplicas.add(replica); - } - - allLeftMetadata.updateNormalToken(moving.left, endpoint); - //Add ranges after the move - for (Replica replica : strategy.getAddressReplicas(allLeftMetadata, endpoint)) - { - moveAffectedReplicas.add(replica); - } - - for (Replica replica : moveAffectedReplicas) - { - Set<InetAddressAndPort> currentEndpoints = strategy.calculateNaturalReplicas(replica.range().right, metadata).endpoints(); - Set<InetAddressAndPort> newEndpoints = strategy.calculateNaturalReplicas(replica.range().right, allLeftMetadata).endpoints(); - Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints); - for (final InetAddressAndPort address : difference) - { - RangesAtEndpoint newReplicas = strategy.getAddressReplicas(allLeftMetadata, address); - RangesAtEndpoint oldReplicas = strategy.getAddressReplicas(metadata, address); - - // Filter out the things that are already replicated - newReplicas = newReplicas.filter(r -> !oldReplicas.contains(r)); - for (Replica newReplica : newReplicas) - { - // for correctness on write, we need to treat ranges that are becoming full differently - // to those that are presently transient; however reads must continue to use the current view - // for ranges that are becoming transient. We could choose to ignore them here, but it's probably - // cleaner to ensure this is dealt with at point of use, where we can make a conscious decision - // about which to use - for (Replica pendingReplica : newReplica.subtractSameReplication(oldReplicas)) - { - newPendingRanges.addPendingRange(pendingReplica.range(), pendingReplica); - } - } - } - } - - allLeftMetadata.removeEndpoint(endpoint); - } - - return newPendingRanges; - } - - private String tokenToEndpointMapKeysAsStrings() - { - lock.readLock().lock(); - try - { - return StringUtils.join(tokenToEndpointMap.keySet(), ", "); - } - finally - { - lock.readLock().unlock(); - } - } - - /** @return a copy of the bootstrapping tokens map */ - public BiMultiValMap<Token, InetAddressAndPort> getBootstrapTokens() - { - lock.readLock().lock(); - try - { - return new BiMultiValMap<>(bootstrapTokens); - } - finally - { - lock.readLock().unlock(); - } - } - - public Set<InetAddressAndPort> getAllEndpoints() - { - lock.readLock().lock(); - try - { - return ImmutableSet.copyOf(endpointToHostIdMap.keySet()); - } - finally - { - lock.readLock().unlock(); - } - } - - public int getSizeOfAllEndpoints() - { - lock.readLock().lock(); - try - { - return endpointToHostIdMap.size(); - } - finally - { - lock.readLock().unlock(); - } - } - - public Set<InetAddressAndPort> getAllMembers() - { - return getAllEndpoints().stream() - .filter(this::isMember) - .collect(Collectors.toSet()); - } - - /** caller should not modify leavingEndpoints */ - public Set<InetAddressAndPort> getLeavingEndpoints() - { - lock.readLock().lock(); - try - { - return ImmutableSet.copyOf(leavingEndpoints); - } - finally - { - lock.readLock().unlock(); - } - } - - public int getSizeOfLeavingEndpoints() - { - lock.readLock().lock(); - try - { - return leavingEndpoints.size(); - } - finally - { - lock.readLock().unlock(); - } - } - - /** - * Endpoints which are migrating to the new tokens - * @return set of addresses of moving endpoints - */ - public Set<Pair<Token, InetAddressAndPort>> getMovingEndpoints() - { - lock.readLock().lock(); - try - { - return ImmutableSet.copyOf(movingEndpoints); - } - finally - { - lock.readLock().unlock(); - } - } - - public int getSizeOfMovingEndpoints() - { - lock.readLock().lock(); - try - { - return movingEndpoints.size(); - } - finally - { - lock.readLock().unlock(); - } - } - - /** used by tests */ - public void clearUnsafe() - { - lock.writeLock().lock(); - try - { - tokenToEndpointMap.clear(); - endpointToHostIdMap.clear(); - bootstrapTokens.clear(); - leavingEndpoints.clear(); - pendingRanges.clear(); - movingEndpoints.clear(); - sortedTokens.clear(); - topology = Topology.empty(); - invalidateCachedRingsUnsafe(); - } - finally - { - lock.writeLock().unlock(); - } - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - lock.readLock().lock(); - try - { - Multimap<InetAddressAndPort, Token> endpointToTokenMap = tokenToEndpointMap.inverse(); - Set<InetAddressAndPort> eps = endpointToTokenMap.keySet(); - - if (!eps.isEmpty()) - { - sb.append("Normal Tokens:"); - sb.append(LINE_SEPARATOR.getString()); - for (InetAddressAndPort ep : eps) - { - sb.append(ep); - sb.append(':'); - sb.append(endpointToTokenMap.get(ep)); - sb.append(LINE_SEPARATOR.getString()); - } - } - - if (!bootstrapTokens.isEmpty()) - { - sb.append("Bootstrapping Tokens:" ); - sb.append(LINE_SEPARATOR.getString()); - for (Map.Entry<Token, InetAddressAndPort> entry : bootstrapTokens.entrySet()) - { - sb.append(entry.getValue()).append(':').append(entry.getKey()); - sb.append(LINE_SEPARATOR.getString()); - } - } - - if (!leavingEndpoints.isEmpty()) - { - sb.append("Leaving Endpoints:"); - sb.append(LINE_SEPARATOR.getString()); - for (InetAddressAndPort ep : leavingEndpoints) - { - sb.append(ep); - sb.append(LINE_SEPARATOR.getString()); - } - } - - if (!pendingRanges.isEmpty()) - { - sb.append("Pending Ranges:"); - sb.append(LINE_SEPARATOR.getString()); - sb.append(printPendingRanges()); - } - } - finally - { - lock.readLock().unlock(); - } - - return sb.toString(); - } - - private String printPendingRanges() - { - StringBuilder sb = new StringBuilder(); - - for (PendingRangeMaps pendingRangeMaps : pendingRanges.values()) - { - sb.append(pendingRangeMaps.printPendingRanges()); - } - - return sb.toString(); - } - - public EndpointsForToken pendingEndpointsForToken(Token token, String keyspaceName) - { - PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); - if (pendingRangeMaps == null) - return EndpointsForToken.empty(token); - - return pendingRangeMaps.pendingEndpointsFor(token); - } - - /** - * @deprecated retained for benefit of old tests - */ - @Deprecated - public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural) - { - EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName); - return ReplicaLayout.forTokenWrite(Keyspace.open(keyspaceName).getReplicationStrategy(), natural, pending).all(); - } - - /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ - public Multimap<InetAddressAndPort, Token> getEndpointToTokenMapForReading() - { - lock.readLock().lock(); - try - { - Multimap<InetAddressAndPort, Token> cloned = HashMultimap.create(); - for (Map.Entry<Token, InetAddressAndPort> entry : tokenToEndpointMap.entrySet()) - cloned.put(entry.getValue(), entry.getKey()); - return cloned; - } - finally - { - lock.readLock().unlock(); - } - } - - /** - * @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes - * in the cluster. - */ - public Map<Token, InetAddressAndPort> getNormalAndBootstrappingTokenToEndpointMap() - { - lock.readLock().lock(); - try - { - Map<Token, InetAddressAndPort> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size()); - map.putAll(tokenToEndpointMap); - map.putAll(bootstrapTokens); - return map; - } - finally - { - lock.readLock().unlock(); - } - } - - /** - * @return a (stable copy, won't be modified) datacenter to Endpoint map for all the nodes in the cluster. - */ - public ImmutableMultimap<String, InetAddressAndPort> getDC2AllEndpoints(IEndpointSnitch snitch) - { - return Multimaps.index(getAllEndpoints(), snitch::getDatacenter); - } - - /** - * @return the Topology map of nodes to DCs + Racks - * - * This is only allowed when a copy has been made of TokenMetadata, to avoid concurrent modifications - * when Topology methods are subsequently used by the caller. - */ - public Topology getTopology() - { - assert !DatabaseDescriptor.isDaemonInitialized() || this != StorageService.instance.getTokenMetadata(); - return topology; - } - - public long getRingVersion() - { - lock.readLock().lock(); - - try - { - return ringVersion; - } - finally - { - lock.readLock().unlock(); - } - } - - public void invalidateCachedRings() - { - lock.writeLock().lock(); - - try - { - invalidateCachedRingsUnsafe(); - } - finally - { - lock.writeLock().unlock(); - } - } - - private void invalidateCachedRingsUnsafe() - { - ringVersion++; - cachedTokenMap.set(null); - } - - public DecoratedKey decorateKey(ByteBuffer key) - { - return partitioner.decorateKey(key); - } - - /** - * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints - * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy. - */ - public static class Topology - { - /** multi-map of DC to endpoints in that DC */ - private final ImmutableMultimap<String, InetAddressAndPort> dcEndpoints; - /** map of DC to multi-map of rack to endpoints in that rack */ - private final ImmutableMap<String, ImmutableMultimap<String, InetAddressAndPort>> dcRacks; - /** reverse-lookup map for endpoint to current known dc/rack assignment */ - private final ImmutableMap<InetAddressAndPort, Pair<String, String>> currentLocations; - private final Supplier<IEndpointSnitch> snitchSupplier; - - private Topology(Builder builder) - { - this.dcEndpoints = ImmutableMultimap.copyOf(builder.dcEndpoints); - - ImmutableMap.Builder<String, ImmutableMultimap<String, InetAddressAndPort>> dcRackBuilder = ImmutableMap.builder(); - for (Map.Entry<String, Multimap<String, InetAddressAndPort>> entry : builder.dcRacks.entrySet()) - dcRackBuilder.put(entry.getKey(), ImmutableMultimap.copyOf(entry.getValue())); - this.dcRacks = dcRackBuilder.build(); - - this.currentLocations = ImmutableMap.copyOf(builder.currentLocations); - this.snitchSupplier = builder.snitchSupplier; - } - - /** - * @return multi-map of DC to endpoints in that DC - */ - public Multimap<String, InetAddressAndPort> getDatacenterEndpoints() - { - return dcEndpoints; - } - - /** - * @return map of DC to multi-map of rack to endpoints in that rack - */ - public ImmutableMap<String, ImmutableMultimap<String, InetAddressAndPort>> getDatacenterRacks() - { - return dcRacks; - } - - /** - * @return The DC and rack of the given endpoint. - */ - public Pair<String, String> getLocation(InetAddressAndPort addr) - { - return currentLocations.get(addr); - } - - Builder unbuild() - { - return new Builder(this); - } - - static Builder builder(Supplier<IEndpointSnitch> snitchSupplier) - { - return new Builder(snitchSupplier); - } - - static Topology empty() - { - return builder(() -> DatabaseDescriptor.getEndpointSnitch()).build(); - } - - private static class Builder - { - /** multi-map of DC to endpoints in that DC */ - private final Multimap<String, InetAddressAndPort> dcEndpoints; - /** map of DC to multi-map of rack to endpoints in that rack */ - private final Map<String, Multimap<String, InetAddressAndPort>> dcRacks; - /** reverse-lookup map for endpoint to current known dc/rack assignment */ - private final Map<InetAddressAndPort, Pair<String, String>> currentLocations; - private final Supplier<IEndpointSnitch> snitchSupplier; - - Builder(Supplier<IEndpointSnitch> snitchSupplier) - { - this.dcEndpoints = HashMultimap.create(); - this.dcRacks = new HashMap<>(); - this.currentLocations = new HashMap<>(); - this.snitchSupplier = snitchSupplier; - } - - Builder(Topology from) - { - this.dcEndpoints = HashMultimap.create(from.dcEndpoints); - - this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size()); - for (Map.Entry<String, ImmutableMultimap<String, InetAddressAndPort>> entry : from.dcRacks.entrySet()) - dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue())); - - this.currentLocations = new HashMap<>(from.currentLocations); - this.snitchSupplier = from.snitchSupplier; - } - - /** - * Stores current DC/rack assignment for ep - */ - Builder addEndpoint(InetAddressAndPort ep) - { - String dc = snitchSupplier.get().getDatacenter(ep); - String rack = snitchSupplier.get().getRack(ep); - Pair<String, String> current = currentLocations.get(ep); - if (current != null) - { - if (current.left.equals(dc) && current.right.equals(rack)) - return this; - doRemoveEndpoint(ep, current); - } - - doAddEndpoint(ep, dc, rack); - return this; - } - - private void doAddEndpoint(InetAddressAndPort ep, String dc, String rack) - { - dcEndpoints.put(dc, ep); - - if (!dcRacks.containsKey(dc)) - dcRacks.put(dc, HashMultimap.<String, InetAddressAndPort>create()); - dcRacks.get(dc).put(rack, ep); - - currentLocations.put(ep, Pair.create(dc, rack)); - } - - /** - * Removes current DC/rack assignment for ep - */ - Builder removeEndpoint(InetAddressAndPort ep) - { - if (!currentLocations.containsKey(ep)) - return this; - - doRemoveEndpoint(ep, currentLocations.remove(ep)); - return this; - } - - private void doRemoveEndpoint(InetAddressAndPort ep, Pair<String, String> current) - { - dcRacks.get(current.left).remove(current.right, ep); - dcEndpoints.remove(current.left, ep); - } - - Builder updateEndpoint(InetAddressAndPort ep) - { - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - if (snitch == null || !currentLocations.containsKey(ep)) - return this; - - updateEndpoint(ep, snitch); - return this; - } - - Builder updateEndpoints() - { - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - if (snitch == null) - return this; - - for (InetAddressAndPort ep : currentLocations.keySet()) - updateEndpoint(ep, snitch); - - return this; - } - - private void updateEndpoint(InetAddressAndPort ep, IEndpointSnitch snitch) - { - Pair<String, String> current = currentLocations.get(ep); - String dc = snitch.getDatacenter(ep); - String rack = snitch.getRack(ep); - if (dc.equals(current.left) && rack.equals(current.right)) - return; - - doRemoveEndpoint(ep, current); - doAddEndpoint(ep, dc, rack); - } - - Topology build() - { - return new Topology(this); - } - } - } - - // Methods of org.apache.cassandra.tcm.compatibility.AsEndpoints - @Override - public NodeId peerId(InetAddressAndPort endpoint) - { - return new NodeId(getHostId(endpoint)); - } - - @Override - public InetAddressAndPort endpoint(NodeId id) - { - return getEndpointForHostId(id.uuid); - } - - // Methods of org.apache.cassandra.tcm.compatibility.AsLocations - @Override - public Location location(NodeId peer) - { - Pair<String, String> dcRack = topology.getLocation(endpoint(peer)); - return new Location(dcRack.left, dcRack.right); - } - - @Override - public Set<InetAddressAndPort> datacenterEndpoints(String datacenter) - { - return new HashSet<>(topology.dcEndpoints.get(datacenter)); - } - - @Override - public Multimap<String, InetAddressAndPort> allDatacenterEndpoints() - { - return topology.getDatacenterEndpoints(); - } - - @Override - public Multimap<String, InetAddressAndPort> datacenterRacks(String datacenter) - { - return topology.dcRacks.get(datacenter); - } - - @Override - public Map<String, Multimap<String, InetAddressAndPort>> allDatacenterRacks() - { - // terrible, but temporary - Map<String, Multimap<String, InetAddressAndPort>> dcRacks = new HashMap<>(); - topology.getDatacenterRacks().forEach((dc, racks) -> { - dcRacks.put(dc, HashMultimap.create(racks)); - }); - return dcRacks; - } - - // Methods of org.apache.cassandra.tcm.compatibility.AsTokenMap - @Override - public IPartitioner partitioner() - { - return partitioner; - } - - @Override - public ImmutableList<Token> tokens() - { - return ImmutableList.copyOf(sortedTokens); - } - - @Override - public NodeId owner(Token token) - { - return new NodeId(getHostId(getEndpoint(token))); - } - -} diff --git a/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java b/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java deleted file mode 100644 index 0221f1e22f..0000000000 --- a/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.locator; - -import org.apache.cassandra.diag.DiagnosticEventService; -import org.apache.cassandra.locator.TokenMetadataEvent.TokenMetadataEventType; - -/** - * Utility methods for events related to {@link TokenMetadata} changes. - */ -final class TokenMetadataDiagnostics -{ - private static final DiagnosticEventService service = DiagnosticEventService.instance(); - - private TokenMetadataDiagnostics() - { - } - - static void pendingRangeCalculationStarted(TokenMetadata tokenMetadata, String keyspace) - { - if (isEnabled(TokenMetadataEventType.PENDING_RANGE_CALCULATION_STARTED)) - service.publish(new TokenMetadataEvent(TokenMetadataEventType.PENDING_RANGE_CALCULATION_STARTED, tokenMetadata, keyspace)); - } - - private static boolean isEnabled(TokenMetadataEventType type) - { - return service.isEnabled(TokenMetadataEvent.class, type); - } - -} diff --git a/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java b/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java deleted file mode 100644 index c3ed074b33..0000000000 --- a/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.locator; - -import java.io.Serializable; -import java.util.HashMap; - -import org.apache.cassandra.diag.DiagnosticEvent; - -/** - * Events related to {@link TokenMetadata} changes. - */ -public final class TokenMetadataEvent extends DiagnosticEvent -{ - - public enum TokenMetadataEventType - { - PENDING_RANGE_CALCULATION_STARTED, - PENDING_RANGE_CALCULATION_COMPLETED, - } - - private final TokenMetadataEventType type; - private final TokenMetadata tokenMetadata; - private final String keyspace; - - TokenMetadataEvent(TokenMetadataEventType type, TokenMetadata tokenMetadata, String keyspace) - { - this.type = type; - this.tokenMetadata = tokenMetadata; - this.keyspace = keyspace; - } - - public TokenMetadataEventType getType() - { - return type; - } - - public HashMap<String, Serializable> toMap() - { - // be extra defensive against nulls and bugs - HashMap<String, Serializable> ret = new HashMap<>(); - ret.put("keyspace", keyspace); - ret.put("tokenMetadata", tokenMetadata.toString()); - return ret; - } -} diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index efaff9d67d..b6090bfd2d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -171,7 +171,6 @@ import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.locator.SystemReplicas; -import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.metrics.Sampler; import org.apache.cassandra.metrics.SamplingManager; import org.apache.cassandra.metrics.StorageMetrics; diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java index 9829d56021..738b6be220 100644 --- a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java +++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java @@ -45,6 +45,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -57,12 +58,12 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.stress.generate.PartitionGenerator; import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.operations.userdefined.SchemaInsert; import org.apache.cassandra.stress.settings.StressSettings; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tools.nodetool.CompactionStats; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -125,7 +126,7 @@ public abstract class CompactionStress implements Runnable ColumnFamilyStore initCf(StressProfile stressProfile, boolean loadSSTables) { - generateTokens(stressProfile.seedStr, StorageService.instance.getTokenMetadata(), numTokens); + generateTokens(stressProfile.seedStr, numTokens); CreateTableStatement.Raw createStatement = stressProfile.getCreateStatement(); List<File> dataDirectories = getDataDirectories(); @@ -192,12 +193,12 @@ public abstract class CompactionStress implements Runnable * We need consistency to write and compact the same data offline * in the case of a range aware sstable writer. */ - private void generateTokens(String seed, TokenMetadata tokenMetadata, Integer numTokens) + private void generateTokens(String seed, Integer numTokens) { Random random = new Random(seed.hashCode()); - IPartitioner p = tokenMetadata.partitioner; - tokenMetadata.clearUnsafe(); + IPartitioner p = ClusterMetadata.current().tokenMap.partitioner(); + // tokenMetadata.clearUnsafe(); for (int i = 1; i <= numTokens; i++) { InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); @@ -205,7 +206,7 @@ public abstract class CompactionStress implements Runnable for (int j = 0; j < numTokens; ++j) tokens.add(p.getRandomToken(random)); - tokenMetadata.updateNormalTokens(tokens, addr); +// tokenMetadata.updateNormalTokens(tokens, addr); } } @@ -224,6 +225,7 @@ public abstract class CompactionStress implements Runnable public void run() { + ClusterMetadataService.initializeForTools(true); //Setup CompactionManager.instance.setMaximumCompactorThreads(threads); CompactionManager.instance.setCoreCompactorThreads(threads); @@ -300,6 +302,8 @@ public abstract class CompactionStress implements Runnable public void run() { + ClusterMetadataService.initializeForTools(true); + Keyspace.setInitialized(); StressProfile stressProfile = getStressProfile(); ColumnFamilyStore cfs = initCf(stressProfile, false); Directories directories = cfs.getDirectories(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
